Merge git://git.kernel.org/pub/scm/linux/kernel/git/pablo/nf-next
[sfrench/cifs-2.6.git] / net / tipc / socket.c
1 /*
2 * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2014, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "port.h"
39 #include "node.h"
40
41 #include <linux/export.h>
42
43 #define SS_LISTENING    -1      /* socket is listening */
44 #define SS_READY        -2      /* socket is connectionless */
45
46 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
47
48 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
49 static void tipc_data_ready(struct sock *sk);
50 static void tipc_write_space(struct sock *sk);
51 static int tipc_release(struct socket *sock);
52 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
53
54 static const struct proto_ops packet_ops;
55 static const struct proto_ops stream_ops;
56 static const struct proto_ops msg_ops;
57
58 static struct proto tipc_proto;
59 static struct proto tipc_proto_kern;
60
61 /*
62  * Revised TIPC socket locking policy:
63  *
64  * Most socket operations take the standard socket lock when they start
65  * and hold it until they finish (or until they need to sleep).  Acquiring
66  * this lock grants the owner exclusive access to the fields of the socket
67  * data structures, with the exception of the backlog queue.  A few socket
68  * operations can be done without taking the socket lock because they only
69  * read socket information that never changes during the life of the socket.
70  *
71  * Socket operations may acquire the lock for the associated TIPC port if they
72  * need to perform an operation on the port.  If any routine needs to acquire
73  * both the socket lock and the port lock it must take the socket lock first
74  * to avoid the risk of deadlock.
75  *
76  * The dispatcher handling incoming messages cannot grab the socket lock in
77  * the standard fashion, since invoked it runs at the BH level and cannot block.
78  * Instead, it checks to see if the socket lock is currently owned by someone,
79  * and either handles the message itself or adds it to the socket's backlog
80  * queue; in the latter case the queued message is processed once the process
81  * owning the socket lock releases it.
82  *
83  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
84  * the problem of a blocked socket operation preventing any other operations
85  * from occurring.  However, applications must be careful if they have
86  * multiple threads trying to send (or receive) on the same socket, as these
87  * operations might interfere with each other.  For example, doing a connect
88  * and a receive at the same time might allow the receive to consume the
89  * ACK message meant for the connect.  While additional work could be done
90  * to try and overcome this, it doesn't seem to be worthwhile at the present.
91  *
92  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
93  * that another operation that must be performed in a non-blocking manner is
94  * not delayed for very long because the lock has already been taken.
95  *
96  * NOTE: This code assumes that certain fields of a port/socket pair are
97  * constant over its lifetime; such fields can be examined without taking
98  * the socket lock and/or port lock, and do not need to be re-read even
99  * after resuming processing after waiting.  These fields include:
100  *   - socket type
101  *   - pointer to socket sk structure (aka tipc_sock structure)
102  *   - pointer to port structure
103  *   - port reference
104  */
105
106 #include "socket.h"
107
108 /**
109  * advance_rx_queue - discard first buffer in socket receive queue
110  *
111  * Caller must hold socket lock
112  */
113 static void advance_rx_queue(struct sock *sk)
114 {
115         kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
116 }
117
118 /**
119  * reject_rx_queue - reject all buffers in socket receive queue
120  *
121  * Caller must hold socket lock
122  */
123 static void reject_rx_queue(struct sock *sk)
124 {
125         struct sk_buff *buf;
126
127         while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
128                 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
129 }
130
131 /**
132  * tipc_sk_create - create a TIPC socket
133  * @net: network namespace (must be default network)
134  * @sock: pre-allocated socket structure
135  * @protocol: protocol indicator (must be 0)
136  * @kern: caused by kernel or by userspace?
137  *
138  * This routine creates additional data structures used by the TIPC socket,
139  * initializes them, and links them together.
140  *
141  * Returns 0 on success, errno otherwise
142  */
143 static int tipc_sk_create(struct net *net, struct socket *sock,
144                           int protocol, int kern)
145 {
146         const struct proto_ops *ops;
147         socket_state state;
148         struct sock *sk;
149         struct tipc_sock *tsk;
150         struct tipc_port *port;
151         u32 ref;
152
153         /* Validate arguments */
154         if (unlikely(protocol != 0))
155                 return -EPROTONOSUPPORT;
156
157         switch (sock->type) {
158         case SOCK_STREAM:
159                 ops = &stream_ops;
160                 state = SS_UNCONNECTED;
161                 break;
162         case SOCK_SEQPACKET:
163                 ops = &packet_ops;
164                 state = SS_UNCONNECTED;
165                 break;
166         case SOCK_DGRAM:
167         case SOCK_RDM:
168                 ops = &msg_ops;
169                 state = SS_READY;
170                 break;
171         default:
172                 return -EPROTOTYPE;
173         }
174
175         /* Allocate socket's protocol area */
176         if (!kern)
177                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
178         else
179                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);
180
181         if (sk == NULL)
182                 return -ENOMEM;
183
184         tsk = tipc_sk(sk);
185         port = &tsk->port;
186
187         ref = tipc_port_init(port, TIPC_LOW_IMPORTANCE);
188         if (!ref) {
189                 pr_warn("Socket registration failed, ref. table exhausted\n");
190                 sk_free(sk);
191                 return -ENOMEM;
192         }
193
194         /* Finish initializing socket data structures */
195         sock->ops = ops;
196         sock->state = state;
197
198         sock_init_data(sock, sk);
199         sk->sk_backlog_rcv = tipc_backlog_rcv;
200         sk->sk_rcvbuf = sysctl_tipc_rmem[1];
201         sk->sk_data_ready = tipc_data_ready;
202         sk->sk_write_space = tipc_write_space;
203         tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
204         atomic_set(&tsk->dupl_rcvcnt, 0);
205         tipc_port_unlock(port);
206
207         if (sock->state == SS_READY) {
208                 tipc_port_set_unreturnable(port, true);
209                 if (sock->type == SOCK_DGRAM)
210                         tipc_port_set_unreliable(port, true);
211         }
212         return 0;
213 }
214
215 /**
216  * tipc_sock_create_local - create TIPC socket from inside TIPC module
217  * @type: socket type - SOCK_RDM or SOCK_SEQPACKET
218  *
219  * We cannot use sock_creat_kern here because it bumps module user count.
220  * Since socket owner and creator is the same module we must make sure
221  * that module count remains zero for module local sockets, otherwise
222  * we cannot do rmmod.
223  *
224  * Returns 0 on success, errno otherwise
225  */
226 int tipc_sock_create_local(int type, struct socket **res)
227 {
228         int rc;
229
230         rc = sock_create_lite(AF_TIPC, type, 0, res);
231         if (rc < 0) {
232                 pr_err("Failed to create kernel socket\n");
233                 return rc;
234         }
235         tipc_sk_create(&init_net, *res, 0, 1);
236
237         return 0;
238 }
239
240 /**
241  * tipc_sock_release_local - release socket created by tipc_sock_create_local
242  * @sock: the socket to be released.
243  *
244  * Module reference count is not incremented when such sockets are created,
245  * so we must keep it from being decremented when they are released.
246  */
247 void tipc_sock_release_local(struct socket *sock)
248 {
249         tipc_release(sock);
250         sock->ops = NULL;
251         sock_release(sock);
252 }
253
254 /**
255  * tipc_sock_accept_local - accept a connection on a socket created
256  * with tipc_sock_create_local. Use this function to avoid that
257  * module reference count is inadvertently incremented.
258  *
259  * @sock:    the accepting socket
260  * @newsock: reference to the new socket to be created
261  * @flags:   socket flags
262  */
263
264 int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
265                            int flags)
266 {
267         struct sock *sk = sock->sk;
268         int ret;
269
270         ret = sock_create_lite(sk->sk_family, sk->sk_type,
271                                sk->sk_protocol, newsock);
272         if (ret < 0)
273                 return ret;
274
275         ret = tipc_accept(sock, *newsock, flags);
276         if (ret < 0) {
277                 sock_release(*newsock);
278                 return ret;
279         }
280         (*newsock)->ops = sock->ops;
281         return ret;
282 }
283
284 /**
285  * tipc_release - destroy a TIPC socket
286  * @sock: socket to destroy
287  *
288  * This routine cleans up any messages that are still queued on the socket.
289  * For DGRAM and RDM socket types, all queued messages are rejected.
290  * For SEQPACKET and STREAM socket types, the first message is rejected
291  * and any others are discarded.  (If the first message on a STREAM socket
292  * is partially-read, it is discarded and the next one is rejected instead.)
293  *
294  * NOTE: Rejected messages are not necessarily returned to the sender!  They
295  * are returned or discarded according to the "destination droppable" setting
296  * specified for the message by the sender.
297  *
298  * Returns 0 on success, errno otherwise
299  */
300 static int tipc_release(struct socket *sock)
301 {
302         struct sock *sk = sock->sk;
303         struct tipc_sock *tsk;
304         struct tipc_port *port;
305         struct sk_buff *buf;
306
307         /*
308          * Exit if socket isn't fully initialized (occurs when a failed accept()
309          * releases a pre-allocated child socket that was never used)
310          */
311         if (sk == NULL)
312                 return 0;
313
314         tsk = tipc_sk(sk);
315         port = &tsk->port;
316         lock_sock(sk);
317
318         /*
319          * Reject all unreceived messages, except on an active connection
320          * (which disconnects locally & sends a 'FIN+' to peer)
321          */
322         while (sock->state != SS_DISCONNECTING) {
323                 buf = __skb_dequeue(&sk->sk_receive_queue);
324                 if (buf == NULL)
325                         break;
326                 if (TIPC_SKB_CB(buf)->handle != NULL)
327                         kfree_skb(buf);
328                 else {
329                         if ((sock->state == SS_CONNECTING) ||
330                             (sock->state == SS_CONNECTED)) {
331                                 sock->state = SS_DISCONNECTING;
332                                 tipc_port_disconnect(port->ref);
333                         }
334                         tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
335                 }
336         }
337
338         /* Destroy TIPC port; also disconnects an active connection and
339          * sends a 'FIN-' to peer.
340          */
341         tipc_port_destroy(port);
342
343         /* Discard any remaining (connection-based) messages in receive queue */
344         __skb_queue_purge(&sk->sk_receive_queue);
345
346         /* Reject any messages that accumulated in backlog queue */
347         sock->state = SS_DISCONNECTING;
348         release_sock(sk);
349
350         sock_put(sk);
351         sock->sk = NULL;
352
353         return 0;
354 }
355
356 /**
357  * tipc_bind - associate or disassocate TIPC name(s) with a socket
358  * @sock: socket structure
359  * @uaddr: socket address describing name(s) and desired operation
360  * @uaddr_len: size of socket address data structure
361  *
362  * Name and name sequence binding is indicated using a positive scope value;
363  * a negative scope value unbinds the specified name.  Specifying no name
364  * (i.e. a socket address length of 0) unbinds all names from the socket.
365  *
366  * Returns 0 on success, errno otherwise
367  *
368  * NOTE: This routine doesn't need to take the socket lock since it doesn't
369  *       access any non-constant socket information.
370  */
371 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
372                      int uaddr_len)
373 {
374         struct sock *sk = sock->sk;
375         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
376         struct tipc_sock *tsk = tipc_sk(sk);
377         int res = -EINVAL;
378
379         lock_sock(sk);
380         if (unlikely(!uaddr_len)) {
381                 res = tipc_withdraw(&tsk->port, 0, NULL);
382                 goto exit;
383         }
384
385         if (uaddr_len < sizeof(struct sockaddr_tipc)) {
386                 res = -EINVAL;
387                 goto exit;
388         }
389         if (addr->family != AF_TIPC) {
390                 res = -EAFNOSUPPORT;
391                 goto exit;
392         }
393
394         if (addr->addrtype == TIPC_ADDR_NAME)
395                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
396         else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
397                 res = -EAFNOSUPPORT;
398                 goto exit;
399         }
400
401         if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
402             (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
403             (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
404                 res = -EACCES;
405                 goto exit;
406         }
407
408         res = (addr->scope > 0) ?
409                 tipc_publish(&tsk->port, addr->scope, &addr->addr.nameseq) :
410                 tipc_withdraw(&tsk->port, -addr->scope, &addr->addr.nameseq);
411 exit:
412         release_sock(sk);
413         return res;
414 }
415
416 /**
417  * tipc_getname - get port ID of socket or peer socket
418  * @sock: socket structure
419  * @uaddr: area for returned socket address
420  * @uaddr_len: area for returned length of socket address
421  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
422  *
423  * Returns 0 on success, errno otherwise
424  *
425  * NOTE: This routine doesn't need to take the socket lock since it only
426  *       accesses socket information that is unchanging (or which changes in
427  *       a completely predictable manner).
428  */
429 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
430                         int *uaddr_len, int peer)
431 {
432         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
433         struct tipc_sock *tsk = tipc_sk(sock->sk);
434
435         memset(addr, 0, sizeof(*addr));
436         if (peer) {
437                 if ((sock->state != SS_CONNECTED) &&
438                         ((peer != 2) || (sock->state != SS_DISCONNECTING)))
439                         return -ENOTCONN;
440                 addr->addr.id.ref = tipc_port_peerport(&tsk->port);
441                 addr->addr.id.node = tipc_port_peernode(&tsk->port);
442         } else {
443                 addr->addr.id.ref = tsk->port.ref;
444                 addr->addr.id.node = tipc_own_addr;
445         }
446
447         *uaddr_len = sizeof(*addr);
448         addr->addrtype = TIPC_ADDR_ID;
449         addr->family = AF_TIPC;
450         addr->scope = 0;
451         addr->addr.name.domain = 0;
452
453         return 0;
454 }
455
456 /**
457  * tipc_poll - read and possibly block on pollmask
458  * @file: file structure associated with the socket
459  * @sock: socket for which to calculate the poll bits
460  * @wait: ???
461  *
462  * Returns pollmask value
463  *
464  * COMMENTARY:
465  * It appears that the usual socket locking mechanisms are not useful here
466  * since the pollmask info is potentially out-of-date the moment this routine
467  * exits.  TCP and other protocols seem to rely on higher level poll routines
468  * to handle any preventable race conditions, so TIPC will do the same ...
469  *
470  * TIPC sets the returned events as follows:
471  *
472  * socket state         flags set
473  * ------------         ---------
474  * unconnected          no read flags
475  *                      POLLOUT if port is not congested
476  *
477  * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
478  *                      no write flags
479  *
480  * connected            POLLIN/POLLRDNORM if data in rx queue
481  *                      POLLOUT if port is not congested
482  *
483  * disconnecting        POLLIN/POLLRDNORM/POLLHUP
484  *                      no write flags
485  *
486  * listening            POLLIN if SYN in rx queue
487  *                      no write flags
488  *
489  * ready                POLLIN/POLLRDNORM if data in rx queue
490  * [connectionless]     POLLOUT (since port cannot be congested)
491  *
492  * IMPORTANT: The fact that a read or write operation is indicated does NOT
493  * imply that the operation will succeed, merely that it should be performed
494  * and will not block.
495  */
496 static unsigned int tipc_poll(struct file *file, struct socket *sock,
497                               poll_table *wait)
498 {
499         struct sock *sk = sock->sk;
500         struct tipc_sock *tsk = tipc_sk(sk);
501         u32 mask = 0;
502
503         sock_poll_wait(file, sk_sleep(sk), wait);
504
505         switch ((int)sock->state) {
506         case SS_UNCONNECTED:
507                 if (!tsk->port.congested)
508                         mask |= POLLOUT;
509                 break;
510         case SS_READY:
511         case SS_CONNECTED:
512                 if (!tsk->port.congested)
513                         mask |= POLLOUT;
514                 /* fall thru' */
515         case SS_CONNECTING:
516         case SS_LISTENING:
517                 if (!skb_queue_empty(&sk->sk_receive_queue))
518                         mask |= (POLLIN | POLLRDNORM);
519                 break;
520         case SS_DISCONNECTING:
521                 mask = (POLLIN | POLLRDNORM | POLLHUP);
522                 break;
523         }
524
525         return mask;
526 }
527
528 /**
529  * dest_name_check - verify user is permitted to send to specified port name
530  * @dest: destination address
531  * @m: descriptor for message to be sent
532  *
533  * Prevents restricted configuration commands from being issued by
534  * unauthorized users.
535  *
536  * Returns 0 if permission is granted, otherwise errno
537  */
538 static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
539 {
540         struct tipc_cfg_msg_hdr hdr;
541
542         if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
543                 return 0;
544         if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
545                 return 0;
546         if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
547                 return -EACCES;
548
549         if (!m->msg_iovlen || (m->msg_iov[0].iov_len < sizeof(hdr)))
550                 return -EMSGSIZE;
551         if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
552                 return -EFAULT;
553         if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
554                 return -EACCES;
555
556         return 0;
557 }
558
559 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
560 {
561         struct sock *sk = sock->sk;
562         struct tipc_sock *tsk = tipc_sk(sk);
563         DEFINE_WAIT(wait);
564         int done;
565
566         do {
567                 int err = sock_error(sk);
568                 if (err)
569                         return err;
570                 if (sock->state == SS_DISCONNECTING)
571                         return -EPIPE;
572                 if (!*timeo_p)
573                         return -EAGAIN;
574                 if (signal_pending(current))
575                         return sock_intr_errno(*timeo_p);
576
577                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
578                 done = sk_wait_event(sk, timeo_p, !tsk->port.congested);
579                 finish_wait(sk_sleep(sk), &wait);
580         } while (!done);
581         return 0;
582 }
583
584
585 /**
586  * tipc_sendmsg - send message in connectionless manner
587  * @iocb: if NULL, indicates that socket lock is already held
588  * @sock: socket structure
589  * @m: message to send
590  * @total_len: length of message
591  *
592  * Message must have an destination specified explicitly.
593  * Used for SOCK_RDM and SOCK_DGRAM messages,
594  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
595  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
596  *
597  * Returns the number of bytes sent on success, or errno otherwise
598  */
599 static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
600                         struct msghdr *m, size_t total_len)
601 {
602         struct sock *sk = sock->sk;
603         struct tipc_sock *tsk = tipc_sk(sk);
604         struct tipc_port *port = &tsk->port;
605         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
606         int needs_conn;
607         long timeo;
608         int res = -EINVAL;
609
610         if (unlikely(!dest))
611                 return -EDESTADDRREQ;
612         if (unlikely((m->msg_namelen < sizeof(*dest)) ||
613                      (dest->family != AF_TIPC)))
614                 return -EINVAL;
615         if (total_len > TIPC_MAX_USER_MSG_SIZE)
616                 return -EMSGSIZE;
617
618         if (iocb)
619                 lock_sock(sk);
620
621         needs_conn = (sock->state != SS_READY);
622         if (unlikely(needs_conn)) {
623                 if (sock->state == SS_LISTENING) {
624                         res = -EPIPE;
625                         goto exit;
626                 }
627                 if (sock->state != SS_UNCONNECTED) {
628                         res = -EISCONN;
629                         goto exit;
630                 }
631                 if (tsk->port.published) {
632                         res = -EOPNOTSUPP;
633                         goto exit;
634                 }
635                 if (dest->addrtype == TIPC_ADDR_NAME) {
636                         tsk->port.conn_type = dest->addr.name.name.type;
637                         tsk->port.conn_instance = dest->addr.name.name.instance;
638                 }
639
640                 /* Abort any pending connection attempts (very unlikely) */
641                 reject_rx_queue(sk);
642         }
643
644         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
645         do {
646                 if (dest->addrtype == TIPC_ADDR_NAME) {
647                         res = dest_name_check(dest, m);
648                         if (res)
649                                 break;
650                         res = tipc_send2name(port,
651                                              &dest->addr.name.name,
652                                              dest->addr.name.domain,
653                                              m->msg_iov,
654                                              total_len);
655                 } else if (dest->addrtype == TIPC_ADDR_ID) {
656                         res = tipc_send2port(port,
657                                              &dest->addr.id,
658                                              m->msg_iov,
659                                              total_len);
660                 } else if (dest->addrtype == TIPC_ADDR_MCAST) {
661                         if (needs_conn) {
662                                 res = -EOPNOTSUPP;
663                                 break;
664                         }
665                         res = dest_name_check(dest, m);
666                         if (res)
667                                 break;
668                         res = tipc_port_mcast_xmit(port,
669                                                    &dest->addr.nameseq,
670                                                    m->msg_iov,
671                                                    total_len);
672                 }
673                 if (likely(res != -ELINKCONG)) {
674                         if (needs_conn && (res >= 0))
675                                 sock->state = SS_CONNECTING;
676                         break;
677                 }
678                 res = tipc_wait_for_sndmsg(sock, &timeo);
679                 if (res)
680                         break;
681         } while (1);
682
683 exit:
684         if (iocb)
685                 release_sock(sk);
686         return res;
687 }
688
689 static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
690 {
691         struct sock *sk = sock->sk;
692         struct tipc_sock *tsk = tipc_sk(sk);
693         struct tipc_port *port = &tsk->port;
694         DEFINE_WAIT(wait);
695         int done;
696
697         do {
698                 int err = sock_error(sk);
699                 if (err)
700                         return err;
701                 if (sock->state == SS_DISCONNECTING)
702                         return -EPIPE;
703                 else if (sock->state != SS_CONNECTED)
704                         return -ENOTCONN;
705                 if (!*timeo_p)
706                         return -EAGAIN;
707                 if (signal_pending(current))
708                         return sock_intr_errno(*timeo_p);
709
710                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
711                 done = sk_wait_event(sk, timeo_p,
712                                      (!port->congested || !port->connected));
713                 finish_wait(sk_sleep(sk), &wait);
714         } while (!done);
715         return 0;
716 }
717
718 /**
719  * tipc_send_packet - send a connection-oriented message
720  * @iocb: if NULL, indicates that socket lock is already held
721  * @sock: socket structure
722  * @m: message to send
723  * @total_len: length of message
724  *
725  * Used for SOCK_SEQPACKET messages and SOCK_STREAM data.
726  *
727  * Returns the number of bytes sent on success, or errno otherwise
728  */
729 static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
730                             struct msghdr *m, size_t total_len)
731 {
732         struct sock *sk = sock->sk;
733         struct tipc_sock *tsk = tipc_sk(sk);
734         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
735         int res = -EINVAL;
736         long timeo;
737
738         /* Handle implied connection establishment */
739         if (unlikely(dest))
740                 return tipc_sendmsg(iocb, sock, m, total_len);
741
742         if (total_len > TIPC_MAX_USER_MSG_SIZE)
743                 return -EMSGSIZE;
744
745         if (iocb)
746                 lock_sock(sk);
747
748         if (unlikely(sock->state != SS_CONNECTED)) {
749                 if (sock->state == SS_DISCONNECTING)
750                         res = -EPIPE;
751                 else
752                         res = -ENOTCONN;
753                 goto exit;
754         }
755
756         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
757         do {
758                 res = tipc_send(&tsk->port, m->msg_iov, total_len);
759                 if (likely(res != -ELINKCONG))
760                         break;
761                 res = tipc_wait_for_sndpkt(sock, &timeo);
762                 if (res)
763                         break;
764         } while (1);
765 exit:
766         if (iocb)
767                 release_sock(sk);
768         return res;
769 }
770
771 /**
772  * tipc_send_stream - send stream-oriented data
773  * @iocb: (unused)
774  * @sock: socket structure
775  * @m: data to send
776  * @total_len: total length of data to be sent
777  *
778  * Used for SOCK_STREAM data.
779  *
780  * Returns the number of bytes sent on success (or partial success),
781  * or errno if no data sent
782  */
783 static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
784                             struct msghdr *m, size_t total_len)
785 {
786         struct sock *sk = sock->sk;
787         struct tipc_sock *tsk = tipc_sk(sk);
788         struct msghdr my_msg;
789         struct iovec my_iov;
790         struct iovec *curr_iov;
791         int curr_iovlen;
792         char __user *curr_start;
793         u32 hdr_size;
794         int curr_left;
795         int bytes_to_send;
796         int bytes_sent;
797         int res;
798
799         lock_sock(sk);
800
801         /* Handle special cases where there is no connection */
802         if (unlikely(sock->state != SS_CONNECTED)) {
803                 if (sock->state == SS_UNCONNECTED)
804                         res = tipc_send_packet(NULL, sock, m, total_len);
805                 else
806                         res = sock->state == SS_DISCONNECTING ? -EPIPE : -ENOTCONN;
807                 goto exit;
808         }
809
810         if (unlikely(m->msg_name)) {
811                 res = -EISCONN;
812                 goto exit;
813         }
814
815         if (total_len > (unsigned int)INT_MAX) {
816                 res = -EMSGSIZE;
817                 goto exit;
818         }
819
820         /*
821          * Send each iovec entry using one or more messages
822          *
823          * Note: This algorithm is good for the most likely case
824          * (i.e. one large iovec entry), but could be improved to pass sets
825          * of small iovec entries into send_packet().
826          */
827         curr_iov = m->msg_iov;
828         curr_iovlen = m->msg_iovlen;
829         my_msg.msg_iov = &my_iov;
830         my_msg.msg_iovlen = 1;
831         my_msg.msg_flags = m->msg_flags;
832         my_msg.msg_name = NULL;
833         bytes_sent = 0;
834
835         hdr_size = msg_hdr_sz(&tsk->port.phdr);
836
837         while (curr_iovlen--) {
838                 curr_start = curr_iov->iov_base;
839                 curr_left = curr_iov->iov_len;
840
841                 while (curr_left) {
842                         bytes_to_send = tsk->port.max_pkt - hdr_size;
843                         if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
844                                 bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
845                         if (curr_left < bytes_to_send)
846                                 bytes_to_send = curr_left;
847                         my_iov.iov_base = curr_start;
848                         my_iov.iov_len = bytes_to_send;
849                         res = tipc_send_packet(NULL, sock, &my_msg,
850                                                bytes_to_send);
851                         if (res < 0) {
852                                 if (bytes_sent)
853                                         res = bytes_sent;
854                                 goto exit;
855                         }
856                         curr_left -= bytes_to_send;
857                         curr_start += bytes_to_send;
858                         bytes_sent += bytes_to_send;
859                 }
860
861                 curr_iov++;
862         }
863         res = bytes_sent;
864 exit:
865         release_sock(sk);
866         return res;
867 }
868
869 /**
870  * auto_connect - complete connection setup to a remote port
871  * @tsk: tipc socket structure
872  * @msg: peer's response message
873  *
874  * Returns 0 on success, errno otherwise
875  */
876 static int auto_connect(struct tipc_sock *tsk, struct tipc_msg *msg)
877 {
878         struct tipc_port *port = &tsk->port;
879         struct socket *sock = tsk->sk.sk_socket;
880         struct tipc_portid peer;
881
882         peer.ref = msg_origport(msg);
883         peer.node = msg_orignode(msg);
884
885         __tipc_port_connect(port->ref, port, &peer);
886
887         if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)
888                 return -EINVAL;
889         msg_set_importance(&port->phdr, (u32)msg_importance(msg));
890         sock->state = SS_CONNECTED;
891         return 0;
892 }
893
894 /**
895  * set_orig_addr - capture sender's address for received message
896  * @m: descriptor for message info
897  * @msg: received message header
898  *
899  * Note: Address is not captured if not requested by receiver.
900  */
901 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
902 {
903         DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
904
905         if (addr) {
906                 addr->family = AF_TIPC;
907                 addr->addrtype = TIPC_ADDR_ID;
908                 memset(&addr->addr, 0, sizeof(addr->addr));
909                 addr->addr.id.ref = msg_origport(msg);
910                 addr->addr.id.node = msg_orignode(msg);
911                 addr->addr.name.domain = 0;     /* could leave uninitialized */
912                 addr->scope = 0;                /* could leave uninitialized */
913                 m->msg_namelen = sizeof(struct sockaddr_tipc);
914         }
915 }
916
917 /**
918  * anc_data_recv - optionally capture ancillary data for received message
919  * @m: descriptor for message info
920  * @msg: received message header
921  * @tport: TIPC port associated with message
922  *
923  * Note: Ancillary data is not captured if not requested by receiver.
924  *
925  * Returns 0 if successful, otherwise errno
926  */
927 static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
928                          struct tipc_port *tport)
929 {
930         u32 anc_data[3];
931         u32 err;
932         u32 dest_type;
933         int has_name;
934         int res;
935
936         if (likely(m->msg_controllen == 0))
937                 return 0;
938
939         /* Optionally capture errored message object(s) */
940         err = msg ? msg_errcode(msg) : 0;
941         if (unlikely(err)) {
942                 anc_data[0] = err;
943                 anc_data[1] = msg_data_sz(msg);
944                 res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
945                 if (res)
946                         return res;
947                 if (anc_data[1]) {
948                         res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
949                                        msg_data(msg));
950                         if (res)
951                                 return res;
952                 }
953         }
954
955         /* Optionally capture message destination object */
956         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
957         switch (dest_type) {
958         case TIPC_NAMED_MSG:
959                 has_name = 1;
960                 anc_data[0] = msg_nametype(msg);
961                 anc_data[1] = msg_namelower(msg);
962                 anc_data[2] = msg_namelower(msg);
963                 break;
964         case TIPC_MCAST_MSG:
965                 has_name = 1;
966                 anc_data[0] = msg_nametype(msg);
967                 anc_data[1] = msg_namelower(msg);
968                 anc_data[2] = msg_nameupper(msg);
969                 break;
970         case TIPC_CONN_MSG:
971                 has_name = (tport->conn_type != 0);
972                 anc_data[0] = tport->conn_type;
973                 anc_data[1] = tport->conn_instance;
974                 anc_data[2] = tport->conn_instance;
975                 break;
976         default:
977                 has_name = 0;
978         }
979         if (has_name) {
980                 res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
981                 if (res)
982                         return res;
983         }
984
985         return 0;
986 }
987
988 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
989 {
990         struct sock *sk = sock->sk;
991         DEFINE_WAIT(wait);
992         long timeo = *timeop;
993         int err;
994
995         for (;;) {
996                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
997                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
998                         if (sock->state == SS_DISCONNECTING) {
999                                 err = -ENOTCONN;
1000                                 break;
1001                         }
1002                         release_sock(sk);
1003                         timeo = schedule_timeout(timeo);
1004                         lock_sock(sk);
1005                 }
1006                 err = 0;
1007                 if (!skb_queue_empty(&sk->sk_receive_queue))
1008                         break;
1009                 err = sock_intr_errno(timeo);
1010                 if (signal_pending(current))
1011                         break;
1012                 err = -EAGAIN;
1013                 if (!timeo)
1014                         break;
1015         }
1016         finish_wait(sk_sleep(sk), &wait);
1017         *timeop = timeo;
1018         return err;
1019 }
1020
1021 /**
1022  * tipc_recvmsg - receive packet-oriented message
1023  * @iocb: (unused)
1024  * @m: descriptor for message info
1025  * @buf_len: total size of user buffer area
1026  * @flags: receive flags
1027  *
1028  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1029  * If the complete message doesn't fit in user area, truncate it.
1030  *
1031  * Returns size of returned message data, errno otherwise
1032  */
1033 static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock,
1034                         struct msghdr *m, size_t buf_len, int flags)
1035 {
1036         struct sock *sk = sock->sk;
1037         struct tipc_sock *tsk = tipc_sk(sk);
1038         struct tipc_port *port = &tsk->port;
1039         struct sk_buff *buf;
1040         struct tipc_msg *msg;
1041         long timeo;
1042         unsigned int sz;
1043         u32 err;
1044         int res;
1045
1046         /* Catch invalid receive requests */
1047         if (unlikely(!buf_len))
1048                 return -EINVAL;
1049
1050         lock_sock(sk);
1051
1052         if (unlikely(sock->state == SS_UNCONNECTED)) {
1053                 res = -ENOTCONN;
1054                 goto exit;
1055         }
1056
1057         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1058 restart:
1059
1060         /* Look for a message in receive queue; wait if necessary */
1061         res = tipc_wait_for_rcvmsg(sock, &timeo);
1062         if (res)
1063                 goto exit;
1064
1065         /* Look at first message in receive queue */
1066         buf = skb_peek(&sk->sk_receive_queue);
1067         msg = buf_msg(buf);
1068         sz = msg_data_sz(msg);
1069         err = msg_errcode(msg);
1070
1071         /* Discard an empty non-errored message & try again */
1072         if ((!sz) && (!err)) {
1073                 advance_rx_queue(sk);
1074                 goto restart;
1075         }
1076
1077         /* Capture sender's address (optional) */
1078         set_orig_addr(m, msg);
1079
1080         /* Capture ancillary data (optional) */
1081         res = anc_data_recv(m, msg, port);
1082         if (res)
1083                 goto exit;
1084
1085         /* Capture message data (if valid) & compute return value (always) */
1086         if (!err) {
1087                 if (unlikely(buf_len < sz)) {
1088                         sz = buf_len;
1089                         m->msg_flags |= MSG_TRUNC;
1090                 }
1091                 res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg),
1092                                               m->msg_iov, sz);
1093                 if (res)
1094                         goto exit;
1095                 res = sz;
1096         } else {
1097                 if ((sock->state == SS_READY) ||
1098                     ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1099                         res = 0;
1100                 else
1101                         res = -ECONNRESET;
1102         }
1103
1104         /* Consume received message (optional) */
1105         if (likely(!(flags & MSG_PEEK))) {
1106                 if ((sock->state != SS_READY) &&
1107                     (++port->conn_unacked >= TIPC_CONNACK_INTV))
1108                         tipc_acknowledge(port->ref, port->conn_unacked);
1109                 advance_rx_queue(sk);
1110         }
1111 exit:
1112         release_sock(sk);
1113         return res;
1114 }
1115
1116 /**
1117  * tipc_recv_stream - receive stream-oriented data
1118  * @iocb: (unused)
1119  * @m: descriptor for message info
1120  * @buf_len: total size of user buffer area
1121  * @flags: receive flags
1122  *
1123  * Used for SOCK_STREAM messages only.  If not enough data is available
1124  * will optionally wait for more; never truncates data.
1125  *
1126  * Returns size of returned message data, errno otherwise
1127  */
1128 static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock,
1129                             struct msghdr *m, size_t buf_len, int flags)
1130 {
1131         struct sock *sk = sock->sk;
1132         struct tipc_sock *tsk = tipc_sk(sk);
1133         struct tipc_port *port = &tsk->port;
1134         struct sk_buff *buf;
1135         struct tipc_msg *msg;
1136         long timeo;
1137         unsigned int sz;
1138         int sz_to_copy, target, needed;
1139         int sz_copied = 0;
1140         u32 err;
1141         int res = 0;
1142
1143         /* Catch invalid receive attempts */
1144         if (unlikely(!buf_len))
1145                 return -EINVAL;
1146
1147         lock_sock(sk);
1148
1149         if (unlikely(sock->state == SS_UNCONNECTED)) {
1150                 res = -ENOTCONN;
1151                 goto exit;
1152         }
1153
1154         target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1155         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1156
1157 restart:
1158         /* Look for a message in receive queue; wait if necessary */
1159         res = tipc_wait_for_rcvmsg(sock, &timeo);
1160         if (res)
1161                 goto exit;
1162
1163         /* Look at first message in receive queue */
1164         buf = skb_peek(&sk->sk_receive_queue);
1165         msg = buf_msg(buf);
1166         sz = msg_data_sz(msg);
1167         err = msg_errcode(msg);
1168
1169         /* Discard an empty non-errored message & try again */
1170         if ((!sz) && (!err)) {
1171                 advance_rx_queue(sk);
1172                 goto restart;
1173         }
1174
1175         /* Optionally capture sender's address & ancillary data of first msg */
1176         if (sz_copied == 0) {
1177                 set_orig_addr(m, msg);
1178                 res = anc_data_recv(m, msg, port);
1179                 if (res)
1180                         goto exit;
1181         }
1182
1183         /* Capture message data (if valid) & compute return value (always) */
1184         if (!err) {
1185                 u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1186
1187                 sz -= offset;
1188                 needed = (buf_len - sz_copied);
1189                 sz_to_copy = (sz <= needed) ? sz : needed;
1190
1191                 res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg) + offset,
1192                                               m->msg_iov, sz_to_copy);
1193                 if (res)
1194                         goto exit;
1195
1196                 sz_copied += sz_to_copy;
1197
1198                 if (sz_to_copy < sz) {
1199                         if (!(flags & MSG_PEEK))
1200                                 TIPC_SKB_CB(buf)->handle =
1201                                 (void *)(unsigned long)(offset + sz_to_copy);
1202                         goto exit;
1203                 }
1204         } else {
1205                 if (sz_copied != 0)
1206                         goto exit; /* can't add error msg to valid data */
1207
1208                 if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1209                         res = 0;
1210                 else
1211                         res = -ECONNRESET;
1212         }
1213
1214         /* Consume received message (optional) */
1215         if (likely(!(flags & MSG_PEEK))) {
1216                 if (unlikely(++port->conn_unacked >= TIPC_CONNACK_INTV))
1217                         tipc_acknowledge(port->ref, port->conn_unacked);
1218                 advance_rx_queue(sk);
1219         }
1220
1221         /* Loop around if more data is required */
1222         if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1223             (!skb_queue_empty(&sk->sk_receive_queue) ||
1224             (sz_copied < target)) &&    /* and more is ready or required */
1225             (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
1226             (!err))                     /* and haven't reached a FIN */
1227                 goto restart;
1228
1229 exit:
1230         release_sock(sk);
1231         return sz_copied ? sz_copied : res;
1232 }
1233
1234 /**
1235  * tipc_write_space - wake up thread if port congestion is released
1236  * @sk: socket
1237  */
1238 static void tipc_write_space(struct sock *sk)
1239 {
1240         struct socket_wq *wq;
1241
1242         rcu_read_lock();
1243         wq = rcu_dereference(sk->sk_wq);
1244         if (wq_has_sleeper(wq))
1245                 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1246                                                 POLLWRNORM | POLLWRBAND);
1247         rcu_read_unlock();
1248 }
1249
1250 /**
1251  * tipc_data_ready - wake up threads to indicate messages have been received
1252  * @sk: socket
1253  * @len: the length of messages
1254  */
1255 static void tipc_data_ready(struct sock *sk)
1256 {
1257         struct socket_wq *wq;
1258
1259         rcu_read_lock();
1260         wq = rcu_dereference(sk->sk_wq);
1261         if (wq_has_sleeper(wq))
1262                 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1263                                                 POLLRDNORM | POLLRDBAND);
1264         rcu_read_unlock();
1265 }
1266
1267 /**
1268  * filter_connect - Handle all incoming messages for a connection-based socket
1269  * @tsk: TIPC socket
1270  * @msg: message
1271  *
1272  * Returns TIPC error status code and socket error status code
1273  * once it encounters some errors
1274  */
1275 static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
1276 {
1277         struct sock *sk = &tsk->sk;
1278         struct tipc_port *port = &tsk->port;
1279         struct socket *sock = sk->sk_socket;
1280         struct tipc_msg *msg = buf_msg(*buf);
1281
1282         u32 retval = TIPC_ERR_NO_PORT;
1283         int res;
1284
1285         if (msg_mcast(msg))
1286                 return retval;
1287
1288         switch ((int)sock->state) {
1289         case SS_CONNECTED:
1290                 /* Accept only connection-based messages sent by peer */
1291                 if (msg_connected(msg) && tipc_port_peer_msg(port, msg)) {
1292                         if (unlikely(msg_errcode(msg))) {
1293                                 sock->state = SS_DISCONNECTING;
1294                                 __tipc_port_disconnect(port);
1295                         }
1296                         retval = TIPC_OK;
1297                 }
1298                 break;
1299         case SS_CONNECTING:
1300                 /* Accept only ACK or NACK message */
1301                 if (unlikely(msg_errcode(msg))) {
1302                         sock->state = SS_DISCONNECTING;
1303                         sk->sk_err = ECONNREFUSED;
1304                         retval = TIPC_OK;
1305                         break;
1306                 }
1307
1308                 if (unlikely(!msg_connected(msg)))
1309                         break;
1310
1311                 res = auto_connect(tsk, msg);
1312                 if (res) {
1313                         sock->state = SS_DISCONNECTING;
1314                         sk->sk_err = -res;
1315                         retval = TIPC_OK;
1316                         break;
1317                 }
1318
1319                 /* If an incoming message is an 'ACK-', it should be
1320                  * discarded here because it doesn't contain useful
1321                  * data. In addition, we should try to wake up
1322                  * connect() routine if sleeping.
1323                  */
1324                 if (msg_data_sz(msg) == 0) {
1325                         kfree_skb(*buf);
1326                         *buf = NULL;
1327                         if (waitqueue_active(sk_sleep(sk)))
1328                                 wake_up_interruptible(sk_sleep(sk));
1329                 }
1330                 retval = TIPC_OK;
1331                 break;
1332         case SS_LISTENING:
1333         case SS_UNCONNECTED:
1334                 /* Accept only SYN message */
1335                 if (!msg_connected(msg) && !(msg_errcode(msg)))
1336                         retval = TIPC_OK;
1337                 break;
1338         case SS_DISCONNECTING:
1339                 break;
1340         default:
1341                 pr_err("Unknown socket state %u\n", sock->state);
1342         }
1343         return retval;
1344 }
1345
1346 /**
1347  * rcvbuf_limit - get proper overload limit of socket receive queue
1348  * @sk: socket
1349  * @buf: message
1350  *
1351  * For all connection oriented messages, irrespective of importance,
1352  * the default overload value (i.e. 67MB) is set as limit.
1353  *
1354  * For all connectionless messages, by default new queue limits are
1355  * as belows:
1356  *
1357  * TIPC_LOW_IMPORTANCE       (4 MB)
1358  * TIPC_MEDIUM_IMPORTANCE    (8 MB)
1359  * TIPC_HIGH_IMPORTANCE      (16 MB)
1360  * TIPC_CRITICAL_IMPORTANCE  (32 MB)
1361  *
1362  * Returns overload limit according to corresponding message importance
1363  */
1364 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1365 {
1366         struct tipc_msg *msg = buf_msg(buf);
1367
1368         if (msg_connected(msg))
1369                 return sysctl_tipc_rmem[2];
1370
1371         return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
1372                 msg_importance(msg);
1373 }
1374
1375 /**
1376  * filter_rcv - validate incoming message
1377  * @sk: socket
1378  * @buf: message
1379  *
1380  * Enqueues message on receive queue if acceptable; optionally handles
1381  * disconnect indication for a connected socket.
1382  *
1383  * Called with socket lock already taken; port lock may also be taken.
1384  *
1385  * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1386  */
1387 static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1388 {
1389         struct socket *sock = sk->sk_socket;
1390         struct tipc_sock *tsk = tipc_sk(sk);
1391         struct tipc_msg *msg = buf_msg(buf);
1392         unsigned int limit = rcvbuf_limit(sk, buf);
1393         u32 res = TIPC_OK;
1394
1395         /* Reject message if it is wrong sort of message for socket */
1396         if (msg_type(msg) > TIPC_DIRECT_MSG)
1397                 return TIPC_ERR_NO_PORT;
1398
1399         if (sock->state == SS_READY) {
1400                 if (msg_connected(msg))
1401                         return TIPC_ERR_NO_PORT;
1402         } else {
1403                 res = filter_connect(tsk, &buf);
1404                 if (res != TIPC_OK || buf == NULL)
1405                         return res;
1406         }
1407
1408         /* Reject message if there isn't room to queue it */
1409         if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
1410                 return TIPC_ERR_OVERLOAD;
1411
1412         /* Enqueue message */
1413         TIPC_SKB_CB(buf)->handle = NULL;
1414         __skb_queue_tail(&sk->sk_receive_queue, buf);
1415         skb_set_owner_r(buf, sk);
1416
1417         sk->sk_data_ready(sk);
1418         return TIPC_OK;
1419 }
1420
1421 /**
1422  * tipc_backlog_rcv - handle incoming message from backlog queue
1423  * @sk: socket
1424  * @buf: message
1425  *
1426  * Caller must hold socket lock, but not port lock.
1427  *
1428  * Returns 0
1429  */
1430 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
1431 {
1432         u32 res;
1433         struct tipc_sock *tsk = tipc_sk(sk);
1434
1435         res = filter_rcv(sk, buf);
1436         if (unlikely(res))
1437                 tipc_reject_msg(buf, res);
1438
1439         if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
1440                 atomic_add(buf->truesize, &tsk->dupl_rcvcnt);
1441
1442         return 0;
1443 }
1444
1445 /**
1446  * tipc_sk_rcv - handle incoming message
1447  * @buf: buffer containing arriving message
1448  * Consumes buffer
1449  * Returns 0 if success, or errno: -EHOSTUNREACH
1450  */
1451 int tipc_sk_rcv(struct sk_buff *buf)
1452 {
1453         struct tipc_sock *tsk;
1454         struct tipc_port *port;
1455         struct sock *sk;
1456         u32 dport = msg_destport(buf_msg(buf));
1457         int err = TIPC_OK;
1458         uint limit;
1459
1460         /* Forward unresolved named message */
1461         if (unlikely(!dport)) {
1462                 tipc_net_route_msg(buf);
1463                 return 0;
1464         }
1465
1466         /* Validate destination */
1467         port = tipc_port_lock(dport);
1468         if (unlikely(!port)) {
1469                 err = TIPC_ERR_NO_PORT;
1470                 goto exit;
1471         }
1472
1473         tsk = tipc_port_to_sock(port);
1474         sk = &tsk->sk;
1475
1476         /* Queue message */
1477         bh_lock_sock(sk);
1478
1479         if (!sock_owned_by_user(sk)) {
1480                 err = filter_rcv(sk, buf);
1481         } else {
1482                 if (sk->sk_backlog.len == 0)
1483                         atomic_set(&tsk->dupl_rcvcnt, 0);
1484                 limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt);
1485                 if (sk_add_backlog(sk, buf, limit))
1486                         err = TIPC_ERR_OVERLOAD;
1487         }
1488
1489         bh_unlock_sock(sk);
1490         tipc_port_unlock(port);
1491
1492         if (likely(!err))
1493                 return 0;
1494 exit:
1495         tipc_reject_msg(buf, err);
1496         return -EHOSTUNREACH;
1497 }
1498
1499 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1500 {
1501         struct sock *sk = sock->sk;
1502         DEFINE_WAIT(wait);
1503         int done;
1504
1505         do {
1506                 int err = sock_error(sk);
1507                 if (err)
1508                         return err;
1509                 if (!*timeo_p)
1510                         return -ETIMEDOUT;
1511                 if (signal_pending(current))
1512                         return sock_intr_errno(*timeo_p);
1513
1514                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1515                 done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1516                 finish_wait(sk_sleep(sk), &wait);
1517         } while (!done);
1518         return 0;
1519 }
1520
1521 /**
1522  * tipc_connect - establish a connection to another TIPC port
1523  * @sock: socket structure
1524  * @dest: socket address for destination port
1525  * @destlen: size of socket address data structure
1526  * @flags: file-related flags associated with socket
1527  *
1528  * Returns 0 on success, errno otherwise
1529  */
1530 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1531                         int destlen, int flags)
1532 {
1533         struct sock *sk = sock->sk;
1534         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1535         struct msghdr m = {NULL,};
1536         long timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
1537         socket_state previous;
1538         int res;
1539
1540         lock_sock(sk);
1541
1542         /* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1543         if (sock->state == SS_READY) {
1544                 res = -EOPNOTSUPP;
1545                 goto exit;
1546         }
1547
1548         /*
1549          * Reject connection attempt using multicast address
1550          *
1551          * Note: send_msg() validates the rest of the address fields,
1552          *       so there's no need to do it here
1553          */
1554         if (dst->addrtype == TIPC_ADDR_MCAST) {
1555                 res = -EINVAL;
1556                 goto exit;
1557         }
1558
1559         previous = sock->state;
1560         switch (sock->state) {
1561         case SS_UNCONNECTED:
1562                 /* Send a 'SYN-' to destination */
1563                 m.msg_name = dest;
1564                 m.msg_namelen = destlen;
1565
1566                 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1567                  * indicate send_msg() is never blocked.
1568                  */
1569                 if (!timeout)
1570                         m.msg_flags = MSG_DONTWAIT;
1571
1572                 res = tipc_sendmsg(NULL, sock, &m, 0);
1573                 if ((res < 0) && (res != -EWOULDBLOCK))
1574                         goto exit;
1575
1576                 /* Just entered SS_CONNECTING state; the only
1577                  * difference is that return value in non-blocking
1578                  * case is EINPROGRESS, rather than EALREADY.
1579                  */
1580                 res = -EINPROGRESS;
1581         case SS_CONNECTING:
1582                 if (previous == SS_CONNECTING)
1583                         res = -EALREADY;
1584                 if (!timeout)
1585                         goto exit;
1586                 timeout = msecs_to_jiffies(timeout);
1587                 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1588                 res = tipc_wait_for_connect(sock, &timeout);
1589                 break;
1590         case SS_CONNECTED:
1591                 res = -EISCONN;
1592                 break;
1593         default:
1594                 res = -EINVAL;
1595                 break;
1596         }
1597 exit:
1598         release_sock(sk);
1599         return res;
1600 }
1601
1602 /**
1603  * tipc_listen - allow socket to listen for incoming connections
1604  * @sock: socket structure
1605  * @len: (unused)
1606  *
1607  * Returns 0 on success, errno otherwise
1608  */
1609 static int tipc_listen(struct socket *sock, int len)
1610 {
1611         struct sock *sk = sock->sk;
1612         int res;
1613
1614         lock_sock(sk);
1615
1616         if (sock->state != SS_UNCONNECTED)
1617                 res = -EINVAL;
1618         else {
1619                 sock->state = SS_LISTENING;
1620                 res = 0;
1621         }
1622
1623         release_sock(sk);
1624         return res;
1625 }
1626
1627 static int tipc_wait_for_accept(struct socket *sock, long timeo)
1628 {
1629         struct sock *sk = sock->sk;
1630         DEFINE_WAIT(wait);
1631         int err;
1632
1633         /* True wake-one mechanism for incoming connections: only
1634          * one process gets woken up, not the 'whole herd'.
1635          * Since we do not 'race & poll' for established sockets
1636          * anymore, the common case will execute the loop only once.
1637         */
1638         for (;;) {
1639                 prepare_to_wait_exclusive(sk_sleep(sk), &wait,
1640                                           TASK_INTERRUPTIBLE);
1641                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1642                         release_sock(sk);
1643                         timeo = schedule_timeout(timeo);
1644                         lock_sock(sk);
1645                 }
1646                 err = 0;
1647                 if (!skb_queue_empty(&sk->sk_receive_queue))
1648                         break;
1649                 err = -EINVAL;
1650                 if (sock->state != SS_LISTENING)
1651                         break;
1652                 err = sock_intr_errno(timeo);
1653                 if (signal_pending(current))
1654                         break;
1655                 err = -EAGAIN;
1656                 if (!timeo)
1657                         break;
1658         }
1659         finish_wait(sk_sleep(sk), &wait);
1660         return err;
1661 }
1662
1663 /**
1664  * tipc_accept - wait for connection request
1665  * @sock: listening socket
1666  * @newsock: new socket that is to be connected
1667  * @flags: file-related flags associated with socket
1668  *
1669  * Returns 0 on success, errno otherwise
1670  */
1671 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
1672 {
1673         struct sock *new_sk, *sk = sock->sk;
1674         struct sk_buff *buf;
1675         struct tipc_port *new_port;
1676         struct tipc_msg *msg;
1677         struct tipc_portid peer;
1678         u32 new_ref;
1679         long timeo;
1680         int res;
1681
1682         lock_sock(sk);
1683
1684         if (sock->state != SS_LISTENING) {
1685                 res = -EINVAL;
1686                 goto exit;
1687         }
1688         timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
1689         res = tipc_wait_for_accept(sock, timeo);
1690         if (res)
1691                 goto exit;
1692
1693         buf = skb_peek(&sk->sk_receive_queue);
1694
1695         res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
1696         if (res)
1697                 goto exit;
1698
1699         new_sk = new_sock->sk;
1700         new_port = &tipc_sk(new_sk)->port;
1701         new_ref = new_port->ref;
1702         msg = buf_msg(buf);
1703
1704         /* we lock on new_sk; but lockdep sees the lock on sk */
1705         lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
1706
1707         /*
1708          * Reject any stray messages received by new socket
1709          * before the socket lock was taken (very, very unlikely)
1710          */
1711         reject_rx_queue(new_sk);
1712
1713         /* Connect new socket to it's peer */
1714         peer.ref = msg_origport(msg);
1715         peer.node = msg_orignode(msg);
1716         tipc_port_connect(new_ref, &peer);
1717         new_sock->state = SS_CONNECTED;
1718
1719         tipc_port_set_importance(new_port, msg_importance(msg));
1720         if (msg_named(msg)) {
1721                 new_port->conn_type = msg_nametype(msg);
1722                 new_port->conn_instance = msg_nameinst(msg);
1723         }
1724
1725         /*
1726          * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1727          * Respond to 'SYN+' by queuing it on new socket.
1728          */
1729         if (!msg_data_sz(msg)) {
1730                 struct msghdr m = {NULL,};
1731
1732                 advance_rx_queue(sk);
1733                 tipc_send_packet(NULL, new_sock, &m, 0);
1734         } else {
1735                 __skb_dequeue(&sk->sk_receive_queue);
1736                 __skb_queue_head(&new_sk->sk_receive_queue, buf);
1737                 skb_set_owner_r(buf, new_sk);
1738         }
1739         release_sock(new_sk);
1740 exit:
1741         release_sock(sk);
1742         return res;
1743 }
1744
1745 /**
1746  * tipc_shutdown - shutdown socket connection
1747  * @sock: socket structure
1748  * @how: direction to close (must be SHUT_RDWR)
1749  *
1750  * Terminates connection (if necessary), then purges socket's receive queue.
1751  *
1752  * Returns 0 on success, errno otherwise
1753  */
1754 static int tipc_shutdown(struct socket *sock, int how)
1755 {
1756         struct sock *sk = sock->sk;
1757         struct tipc_sock *tsk = tipc_sk(sk);
1758         struct tipc_port *port = &tsk->port;
1759         struct sk_buff *buf;
1760         int res;
1761
1762         if (how != SHUT_RDWR)
1763                 return -EINVAL;
1764
1765         lock_sock(sk);
1766
1767         switch (sock->state) {
1768         case SS_CONNECTING:
1769         case SS_CONNECTED:
1770
1771 restart:
1772                 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1773                 buf = __skb_dequeue(&sk->sk_receive_queue);
1774                 if (buf) {
1775                         if (TIPC_SKB_CB(buf)->handle != NULL) {
1776                                 kfree_skb(buf);
1777                                 goto restart;
1778                         }
1779                         tipc_port_disconnect(port->ref);
1780                         tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
1781                 } else {
1782                         tipc_port_shutdown(port->ref);
1783                 }
1784
1785                 sock->state = SS_DISCONNECTING;
1786
1787                 /* fall through */
1788
1789         case SS_DISCONNECTING:
1790
1791                 /* Discard any unreceived messages */
1792                 __skb_queue_purge(&sk->sk_receive_queue);
1793
1794                 /* Wake up anyone sleeping in poll */
1795                 sk->sk_state_change(sk);
1796                 res = 0;
1797                 break;
1798
1799         default:
1800                 res = -ENOTCONN;
1801         }
1802
1803         release_sock(sk);
1804         return res;
1805 }
1806
1807 /**
1808  * tipc_setsockopt - set socket option
1809  * @sock: socket structure
1810  * @lvl: option level
1811  * @opt: option identifier
1812  * @ov: pointer to new option value
1813  * @ol: length of option value
1814  *
1815  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
1816  * (to ease compatibility).
1817  *
1818  * Returns 0 on success, errno otherwise
1819  */
1820 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
1821                            char __user *ov, unsigned int ol)
1822 {
1823         struct sock *sk = sock->sk;
1824         struct tipc_sock *tsk = tipc_sk(sk);
1825         struct tipc_port *port = &tsk->port;
1826         u32 value;
1827         int res;
1828
1829         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1830                 return 0;
1831         if (lvl != SOL_TIPC)
1832                 return -ENOPROTOOPT;
1833         if (ol < sizeof(value))
1834                 return -EINVAL;
1835         res = get_user(value, (u32 __user *)ov);
1836         if (res)
1837                 return res;
1838
1839         lock_sock(sk);
1840
1841         switch (opt) {
1842         case TIPC_IMPORTANCE:
1843                 tipc_port_set_importance(port, value);
1844                 break;
1845         case TIPC_SRC_DROPPABLE:
1846                 if (sock->type != SOCK_STREAM)
1847                         tipc_port_set_unreliable(port, value);
1848                 else
1849                         res = -ENOPROTOOPT;
1850                 break;
1851         case TIPC_DEST_DROPPABLE:
1852                 tipc_port_set_unreturnable(port, value);
1853                 break;
1854         case TIPC_CONN_TIMEOUT:
1855                 tipc_sk(sk)->conn_timeout = value;
1856                 /* no need to set "res", since already 0 at this point */
1857                 break;
1858         default:
1859                 res = -EINVAL;
1860         }
1861
1862         release_sock(sk);
1863
1864         return res;
1865 }
1866
1867 /**
1868  * tipc_getsockopt - get socket option
1869  * @sock: socket structure
1870  * @lvl: option level
1871  * @opt: option identifier
1872  * @ov: receptacle for option value
1873  * @ol: receptacle for length of option value
1874  *
1875  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
1876  * (to ease compatibility).
1877  *
1878  * Returns 0 on success, errno otherwise
1879  */
1880 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
1881                            char __user *ov, int __user *ol)
1882 {
1883         struct sock *sk = sock->sk;
1884         struct tipc_sock *tsk = tipc_sk(sk);
1885         struct tipc_port *port = &tsk->port;
1886         int len;
1887         u32 value;
1888         int res;
1889
1890         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1891                 return put_user(0, ol);
1892         if (lvl != SOL_TIPC)
1893                 return -ENOPROTOOPT;
1894         res = get_user(len, ol);
1895         if (res)
1896                 return res;
1897
1898         lock_sock(sk);
1899
1900         switch (opt) {
1901         case TIPC_IMPORTANCE:
1902                 value = tipc_port_importance(port);
1903                 break;
1904         case TIPC_SRC_DROPPABLE:
1905                 value = tipc_port_unreliable(port);
1906                 break;
1907         case TIPC_DEST_DROPPABLE:
1908                 value = tipc_port_unreturnable(port);
1909                 break;
1910         case TIPC_CONN_TIMEOUT:
1911                 value = tipc_sk(sk)->conn_timeout;
1912                 /* no need to set "res", since already 0 at this point */
1913                 break;
1914         case TIPC_NODE_RECVQ_DEPTH:
1915                 value = 0; /* was tipc_queue_size, now obsolete */
1916                 break;
1917         case TIPC_SOCK_RECVQ_DEPTH:
1918                 value = skb_queue_len(&sk->sk_receive_queue);
1919                 break;
1920         default:
1921                 res = -EINVAL;
1922         }
1923
1924         release_sock(sk);
1925
1926         if (res)
1927                 return res;     /* "get" failed */
1928
1929         if (len < sizeof(value))
1930                 return -EINVAL;
1931
1932         if (copy_to_user(ov, &value, sizeof(value)))
1933                 return -EFAULT;
1934
1935         return put_user(sizeof(value), ol);
1936 }
1937
1938 int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg)
1939 {
1940         struct tipc_sioc_ln_req lnr;
1941         void __user *argp = (void __user *)arg;
1942
1943         switch (cmd) {
1944         case SIOCGETLINKNAME:
1945                 if (copy_from_user(&lnr, argp, sizeof(lnr)))
1946                         return -EFAULT;
1947                 if (!tipc_node_get_linkname(lnr.bearer_id, lnr.peer,
1948                                             lnr.linkname, TIPC_MAX_LINK_NAME)) {
1949                         if (copy_to_user(argp, &lnr, sizeof(lnr)))
1950                                 return -EFAULT;
1951                         return 0;
1952                 }
1953                 return -EADDRNOTAVAIL;
1954                 break;
1955         default:
1956                 return -ENOIOCTLCMD;
1957         }
1958 }
1959
1960 /* Protocol switches for the various types of TIPC sockets */
1961
1962 static const struct proto_ops msg_ops = {
1963         .owner          = THIS_MODULE,
1964         .family         = AF_TIPC,
1965         .release        = tipc_release,
1966         .bind           = tipc_bind,
1967         .connect        = tipc_connect,
1968         .socketpair     = sock_no_socketpair,
1969         .accept         = sock_no_accept,
1970         .getname        = tipc_getname,
1971         .poll           = tipc_poll,
1972         .ioctl          = tipc_ioctl,
1973         .listen         = sock_no_listen,
1974         .shutdown       = tipc_shutdown,
1975         .setsockopt     = tipc_setsockopt,
1976         .getsockopt     = tipc_getsockopt,
1977         .sendmsg        = tipc_sendmsg,
1978         .recvmsg        = tipc_recvmsg,
1979         .mmap           = sock_no_mmap,
1980         .sendpage       = sock_no_sendpage
1981 };
1982
1983 static const struct proto_ops packet_ops = {
1984         .owner          = THIS_MODULE,
1985         .family         = AF_TIPC,
1986         .release        = tipc_release,
1987         .bind           = tipc_bind,
1988         .connect        = tipc_connect,
1989         .socketpair     = sock_no_socketpair,
1990         .accept         = tipc_accept,
1991         .getname        = tipc_getname,
1992         .poll           = tipc_poll,
1993         .ioctl          = tipc_ioctl,
1994         .listen         = tipc_listen,
1995         .shutdown       = tipc_shutdown,
1996         .setsockopt     = tipc_setsockopt,
1997         .getsockopt     = tipc_getsockopt,
1998         .sendmsg        = tipc_send_packet,
1999         .recvmsg        = tipc_recvmsg,
2000         .mmap           = sock_no_mmap,
2001         .sendpage       = sock_no_sendpage
2002 };
2003
2004 static const struct proto_ops stream_ops = {
2005         .owner          = THIS_MODULE,
2006         .family         = AF_TIPC,
2007         .release        = tipc_release,
2008         .bind           = tipc_bind,
2009         .connect        = tipc_connect,
2010         .socketpair     = sock_no_socketpair,
2011         .accept         = tipc_accept,
2012         .getname        = tipc_getname,
2013         .poll           = tipc_poll,
2014         .ioctl          = tipc_ioctl,
2015         .listen         = tipc_listen,
2016         .shutdown       = tipc_shutdown,
2017         .setsockopt     = tipc_setsockopt,
2018         .getsockopt     = tipc_getsockopt,
2019         .sendmsg        = tipc_send_stream,
2020         .recvmsg        = tipc_recv_stream,
2021         .mmap           = sock_no_mmap,
2022         .sendpage       = sock_no_sendpage
2023 };
2024
2025 static const struct net_proto_family tipc_family_ops = {
2026         .owner          = THIS_MODULE,
2027         .family         = AF_TIPC,
2028         .create         = tipc_sk_create
2029 };
2030
2031 static struct proto tipc_proto = {
2032         .name           = "TIPC",
2033         .owner          = THIS_MODULE,
2034         .obj_size       = sizeof(struct tipc_sock),
2035         .sysctl_rmem    = sysctl_tipc_rmem
2036 };
2037
2038 static struct proto tipc_proto_kern = {
2039         .name           = "TIPC",
2040         .obj_size       = sizeof(struct tipc_sock),
2041         .sysctl_rmem    = sysctl_tipc_rmem
2042 };
2043
2044 /**
2045  * tipc_socket_init - initialize TIPC socket interface
2046  *
2047  * Returns 0 on success, errno otherwise
2048  */
2049 int tipc_socket_init(void)
2050 {
2051         int res;
2052
2053         res = proto_register(&tipc_proto, 1);
2054         if (res) {
2055                 pr_err("Failed to register TIPC protocol type\n");
2056                 goto out;
2057         }
2058
2059         res = sock_register(&tipc_family_ops);
2060         if (res) {
2061                 pr_err("Failed to register TIPC socket type\n");
2062                 proto_unregister(&tipc_proto);
2063                 goto out;
2064         }
2065  out:
2066         return res;
2067 }
2068
2069 /**
2070  * tipc_socket_stop - stop TIPC socket interface
2071  */
2072 void tipc_socket_stop(void)
2073 {
2074         sock_unregister(tipc_family_ops.family);
2075         proto_unregister(&tipc_proto);
2076 }