Merge tag 'at91-fixes' of git://github.com/at91linux/linux-at91 into fixes
[sfrench/cifs-2.6.git] / net / tipc / port.c
1 /*
2  * net/tipc/port.c: TIPC port code
3  *
4  * Copyright (c) 1992-2007, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2011, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "config.h"
39 #include "port.h"
40 #include "name_table.h"
41
42 /* Connection management: */
43 #define PROBING_INTERVAL 3600000        /* [ms] => 1 h */
44 #define CONFIRMED 0
45 #define PROBING 1
46
47 #define MAX_REJECT_SIZE 1024
48
49 static struct sk_buff *msg_queue_head;
50 static struct sk_buff *msg_queue_tail;
51
52 DEFINE_SPINLOCK(tipc_port_list_lock);
53 static DEFINE_SPINLOCK(queue_lock);
54
55 static LIST_HEAD(ports);
56 static void port_handle_node_down(unsigned long ref);
57 static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err);
58 static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err);
59 static void port_timeout(unsigned long ref);
60
61
62 static u32 port_peernode(struct tipc_port *p_ptr)
63 {
64         return msg_destnode(&p_ptr->phdr);
65 }
66
67 static u32 port_peerport(struct tipc_port *p_ptr)
68 {
69         return msg_destport(&p_ptr->phdr);
70 }
71
72 /**
73  * tipc_port_peer_msg - verify message was sent by connected port's peer
74  *
75  * Handles cases where the node's network address has changed from
76  * the default of <0.0.0> to its configured setting.
77  */
78 int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg)
79 {
80         u32 peernode;
81         u32 orignode;
82
83         if (msg_origport(msg) != port_peerport(p_ptr))
84                 return 0;
85
86         orignode = msg_orignode(msg);
87         peernode = port_peernode(p_ptr);
88         return (orignode == peernode) ||
89                 (!orignode && (peernode == tipc_own_addr)) ||
90                 (!peernode && (orignode == tipc_own_addr));
91 }
92
93 /**
94  * tipc_multicast - send a multicast message to local and remote destinations
95  */
96 int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
97                    u32 num_sect, struct iovec const *msg_sect,
98                    unsigned int total_len)
99 {
100         struct tipc_msg *hdr;
101         struct sk_buff *buf;
102         struct sk_buff *ibuf = NULL;
103         struct tipc_port_list dports = {0, NULL, };
104         struct tipc_port *oport = tipc_port_deref(ref);
105         int ext_targets;
106         int res;
107
108         if (unlikely(!oport))
109                 return -EINVAL;
110
111         /* Create multicast message */
112         hdr = &oport->phdr;
113         msg_set_type(hdr, TIPC_MCAST_MSG);
114         msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
115         msg_set_destport(hdr, 0);
116         msg_set_destnode(hdr, 0);
117         msg_set_nametype(hdr, seq->type);
118         msg_set_namelower(hdr, seq->lower);
119         msg_set_nameupper(hdr, seq->upper);
120         msg_set_hdr_sz(hdr, MCAST_H_SIZE);
121         res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
122                         !oport->user_port, &buf);
123         if (unlikely(!buf))
124                 return res;
125
126         /* Figure out where to send multicast message */
127         ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
128                                                 TIPC_NODE_SCOPE, &dports);
129
130         /* Send message to destinations (duplicate it only if necessary) */
131         if (ext_targets) {
132                 if (dports.count != 0) {
133                         ibuf = skb_copy(buf, GFP_ATOMIC);
134                         if (ibuf == NULL) {
135                                 tipc_port_list_free(&dports);
136                                 kfree_skb(buf);
137                                 return -ENOMEM;
138                         }
139                 }
140                 res = tipc_bclink_send_msg(buf);
141                 if ((res < 0) && (dports.count != 0))
142                         kfree_skb(ibuf);
143         } else {
144                 ibuf = buf;
145         }
146
147         if (res >= 0) {
148                 if (ibuf)
149                         tipc_port_recv_mcast(ibuf, &dports);
150         } else {
151                 tipc_port_list_free(&dports);
152         }
153         return res;
154 }
155
156 /**
157  * tipc_port_recv_mcast - deliver multicast message to all destination ports
158  *
159  * If there is no port list, perform a lookup to create one
160  */
161 void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp)
162 {
163         struct tipc_msg *msg;
164         struct tipc_port_list dports = {0, NULL, };
165         struct tipc_port_list *item = dp;
166         int cnt = 0;
167
168         msg = buf_msg(buf);
169
170         /* Create destination port list, if one wasn't supplied */
171         if (dp == NULL) {
172                 tipc_nametbl_mc_translate(msg_nametype(msg),
173                                      msg_namelower(msg),
174                                      msg_nameupper(msg),
175                                      TIPC_CLUSTER_SCOPE,
176                                      &dports);
177                 item = dp = &dports;
178         }
179
180         /* Deliver a copy of message to each destination port */
181         if (dp->count != 0) {
182                 msg_set_destnode(msg, tipc_own_addr);
183                 if (dp->count == 1) {
184                         msg_set_destport(msg, dp->ports[0]);
185                         tipc_port_recv_msg(buf);
186                         tipc_port_list_free(dp);
187                         return;
188                 }
189                 for (; cnt < dp->count; cnt++) {
190                         int index = cnt % PLSIZE;
191                         struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
192
193                         if (b == NULL) {
194                                 pr_warn("Unable to deliver multicast message(s)\n");
195                                 goto exit;
196                         }
197                         if ((index == 0) && (cnt != 0))
198                                 item = item->next;
199                         msg_set_destport(buf_msg(b), item->ports[index]);
200                         tipc_port_recv_msg(b);
201                 }
202         }
203 exit:
204         kfree_skb(buf);
205         tipc_port_list_free(dp);
206 }
207
208 /**
209  * tipc_createport_raw - create a generic TIPC port
210  *
211  * Returns pointer to (locked) TIPC port, or NULL if unable to create it
212  */
213 struct tipc_port *tipc_createport_raw(void *usr_handle,
214                         u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
215                         void (*wakeup)(struct tipc_port *),
216                         const u32 importance)
217 {
218         struct tipc_port *p_ptr;
219         struct tipc_msg *msg;
220         u32 ref;
221
222         p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC);
223         if (!p_ptr) {
224                 pr_warn("Port creation failed, no memory\n");
225                 return NULL;
226         }
227         ref = tipc_ref_acquire(p_ptr, &p_ptr->lock);
228         if (!ref) {
229                 pr_warn("Port creation failed, ref. table exhausted\n");
230                 kfree(p_ptr);
231                 return NULL;
232         }
233
234         p_ptr->usr_handle = usr_handle;
235         p_ptr->max_pkt = MAX_PKT_DEFAULT;
236         p_ptr->ref = ref;
237         INIT_LIST_HEAD(&p_ptr->wait_list);
238         INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
239         p_ptr->dispatcher = dispatcher;
240         p_ptr->wakeup = wakeup;
241         p_ptr->user_port = NULL;
242         k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
243         INIT_LIST_HEAD(&p_ptr->publications);
244         INIT_LIST_HEAD(&p_ptr->port_list);
245
246         /*
247          * Must hold port list lock while initializing message header template
248          * to ensure a change to node's own network address doesn't result
249          * in template containing out-dated network address information
250          */
251         spin_lock_bh(&tipc_port_list_lock);
252         msg = &p_ptr->phdr;
253         tipc_msg_init(msg, importance, TIPC_NAMED_MSG, NAMED_H_SIZE, 0);
254         msg_set_origport(msg, ref);
255         list_add_tail(&p_ptr->port_list, &ports);
256         spin_unlock_bh(&tipc_port_list_lock);
257         return p_ptr;
258 }
259
260 int tipc_deleteport(u32 ref)
261 {
262         struct tipc_port *p_ptr;
263         struct sk_buff *buf = NULL;
264
265         tipc_withdraw(ref, 0, NULL);
266         p_ptr = tipc_port_lock(ref);
267         if (!p_ptr)
268                 return -EINVAL;
269
270         tipc_ref_discard(ref);
271         tipc_port_unlock(p_ptr);
272
273         k_cancel_timer(&p_ptr->timer);
274         if (p_ptr->connected) {
275                 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
276                 tipc_nodesub_unsubscribe(&p_ptr->subscription);
277         }
278         kfree(p_ptr->user_port);
279
280         spin_lock_bh(&tipc_port_list_lock);
281         list_del(&p_ptr->port_list);
282         list_del(&p_ptr->wait_list);
283         spin_unlock_bh(&tipc_port_list_lock);
284         k_term_timer(&p_ptr->timer);
285         kfree(p_ptr);
286         tipc_net_route_msg(buf);
287         return 0;
288 }
289
290 static int port_unreliable(struct tipc_port *p_ptr)
291 {
292         return msg_src_droppable(&p_ptr->phdr);
293 }
294
295 int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
296 {
297         struct tipc_port *p_ptr;
298
299         p_ptr = tipc_port_lock(ref);
300         if (!p_ptr)
301                 return -EINVAL;
302         *isunreliable = port_unreliable(p_ptr);
303         tipc_port_unlock(p_ptr);
304         return 0;
305 }
306
307 int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
308 {
309         struct tipc_port *p_ptr;
310
311         p_ptr = tipc_port_lock(ref);
312         if (!p_ptr)
313                 return -EINVAL;
314         msg_set_src_droppable(&p_ptr->phdr, (isunreliable != 0));
315         tipc_port_unlock(p_ptr);
316         return 0;
317 }
318
319 static int port_unreturnable(struct tipc_port *p_ptr)
320 {
321         return msg_dest_droppable(&p_ptr->phdr);
322 }
323
324 int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
325 {
326         struct tipc_port *p_ptr;
327
328         p_ptr = tipc_port_lock(ref);
329         if (!p_ptr)
330                 return -EINVAL;
331         *isunrejectable = port_unreturnable(p_ptr);
332         tipc_port_unlock(p_ptr);
333         return 0;
334 }
335
336 int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
337 {
338         struct tipc_port *p_ptr;
339
340         p_ptr = tipc_port_lock(ref);
341         if (!p_ptr)
342                 return -EINVAL;
343         msg_set_dest_droppable(&p_ptr->phdr, (isunrejectable != 0));
344         tipc_port_unlock(p_ptr);
345         return 0;
346 }
347
348 /*
349  * port_build_proto_msg(): create connection protocol message for port
350  *
351  * On entry the port must be locked and connected.
352  */
353 static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr,
354                                             u32 type, u32 ack)
355 {
356         struct sk_buff *buf;
357         struct tipc_msg *msg;
358
359         buf = tipc_buf_acquire(INT_H_SIZE);
360         if (buf) {
361                 msg = buf_msg(buf);
362                 tipc_msg_init(msg, CONN_MANAGER, type, INT_H_SIZE,
363                               port_peernode(p_ptr));
364                 msg_set_destport(msg, port_peerport(p_ptr));
365                 msg_set_origport(msg, p_ptr->ref);
366                 msg_set_msgcnt(msg, ack);
367         }
368         return buf;
369 }
370
371 int tipc_reject_msg(struct sk_buff *buf, u32 err)
372 {
373         struct tipc_msg *msg = buf_msg(buf);
374         struct sk_buff *rbuf;
375         struct tipc_msg *rmsg;
376         int hdr_sz;
377         u32 imp;
378         u32 data_sz = msg_data_sz(msg);
379         u32 src_node;
380         u32 rmsg_sz;
381
382         /* discard rejected message if it shouldn't be returned to sender */
383         if (WARN(!msg_isdata(msg),
384                  "attempt to reject message with user=%u", msg_user(msg))) {
385                 dump_stack();
386                 goto exit;
387         }
388         if (msg_errcode(msg) || msg_dest_droppable(msg))
389                 goto exit;
390
391         /*
392          * construct returned message by copying rejected message header and
393          * data (or subset), then updating header fields that need adjusting
394          */
395         hdr_sz = msg_hdr_sz(msg);
396         rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE);
397
398         rbuf = tipc_buf_acquire(rmsg_sz);
399         if (rbuf == NULL)
400                 goto exit;
401
402         rmsg = buf_msg(rbuf);
403         skb_copy_to_linear_data(rbuf, msg, rmsg_sz);
404
405         if (msg_connected(rmsg)) {
406                 imp = msg_importance(rmsg);
407                 if (imp < TIPC_CRITICAL_IMPORTANCE)
408                         msg_set_importance(rmsg, ++imp);
409         }
410         msg_set_non_seq(rmsg, 0);
411         msg_set_size(rmsg, rmsg_sz);
412         msg_set_errcode(rmsg, err);
413         msg_set_prevnode(rmsg, tipc_own_addr);
414         msg_swap_words(rmsg, 4, 5);
415         if (!msg_short(rmsg))
416                 msg_swap_words(rmsg, 6, 7);
417
418         /* send self-abort message when rejecting on a connected port */
419         if (msg_connected(msg)) {
420                 struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
421
422                 if (p_ptr) {
423                         struct sk_buff *abuf = NULL;
424
425                         if (p_ptr->connected)
426                                 abuf = port_build_self_abort_msg(p_ptr, err);
427                         tipc_port_unlock(p_ptr);
428                         tipc_net_route_msg(abuf);
429                 }
430         }
431
432         /* send returned message & dispose of rejected message */
433         src_node = msg_prevnode(msg);
434         if (in_own_node(src_node))
435                 tipc_port_recv_msg(rbuf);
436         else
437                 tipc_link_send(rbuf, src_node, msg_link_selector(rmsg));
438 exit:
439         kfree_skb(buf);
440         return data_sz;
441 }
442
443 int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr,
444                               struct iovec const *msg_sect, u32 num_sect,
445                               unsigned int total_len, int err)
446 {
447         struct sk_buff *buf;
448         int res;
449
450         res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
451                         !p_ptr->user_port, &buf);
452         if (!buf)
453                 return res;
454
455         return tipc_reject_msg(buf, err);
456 }
457
458 static void port_timeout(unsigned long ref)
459 {
460         struct tipc_port *p_ptr = tipc_port_lock(ref);
461         struct sk_buff *buf = NULL;
462
463         if (!p_ptr)
464                 return;
465
466         if (!p_ptr->connected) {
467                 tipc_port_unlock(p_ptr);
468                 return;
469         }
470
471         /* Last probe answered ? */
472         if (p_ptr->probing_state == PROBING) {
473                 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
474         } else {
475                 buf = port_build_proto_msg(p_ptr, CONN_PROBE, 0);
476                 p_ptr->probing_state = PROBING;
477                 k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
478         }
479         tipc_port_unlock(p_ptr);
480         tipc_net_route_msg(buf);
481 }
482
483
484 static void port_handle_node_down(unsigned long ref)
485 {
486         struct tipc_port *p_ptr = tipc_port_lock(ref);
487         struct sk_buff *buf = NULL;
488
489         if (!p_ptr)
490                 return;
491         buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
492         tipc_port_unlock(p_ptr);
493         tipc_net_route_msg(buf);
494 }
495
496
497 static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 err)
498 {
499         struct sk_buff *buf = port_build_peer_abort_msg(p_ptr, err);
500
501         if (buf) {
502                 struct tipc_msg *msg = buf_msg(buf);
503                 msg_swap_words(msg, 4, 5);
504                 msg_swap_words(msg, 6, 7);
505         }
506         return buf;
507 }
508
509
510 static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 err)
511 {
512         struct sk_buff *buf;
513         struct tipc_msg *msg;
514         u32 imp;
515
516         if (!p_ptr->connected)
517                 return NULL;
518
519         buf = tipc_buf_acquire(BASIC_H_SIZE);
520         if (buf) {
521                 msg = buf_msg(buf);
522                 memcpy(msg, &p_ptr->phdr, BASIC_H_SIZE);
523                 msg_set_hdr_sz(msg, BASIC_H_SIZE);
524                 msg_set_size(msg, BASIC_H_SIZE);
525                 imp = msg_importance(msg);
526                 if (imp < TIPC_CRITICAL_IMPORTANCE)
527                         msg_set_importance(msg, ++imp);
528                 msg_set_errcode(msg, err);
529         }
530         return buf;
531 }
532
533 void tipc_port_recv_proto_msg(struct sk_buff *buf)
534 {
535         struct tipc_msg *msg = buf_msg(buf);
536         struct tipc_port *p_ptr;
537         struct sk_buff *r_buf = NULL;
538         u32 destport = msg_destport(msg);
539         int wakeable;
540
541         /* Validate connection */
542         p_ptr = tipc_port_lock(destport);
543         if (!p_ptr || !p_ptr->connected || !tipc_port_peer_msg(p_ptr, msg)) {
544                 r_buf = tipc_buf_acquire(BASIC_H_SIZE);
545                 if (r_buf) {
546                         msg = buf_msg(r_buf);
547                         tipc_msg_init(msg, TIPC_HIGH_IMPORTANCE, TIPC_CONN_MSG,
548                                       BASIC_H_SIZE, msg_orignode(msg));
549                         msg_set_errcode(msg, TIPC_ERR_NO_PORT);
550                         msg_set_origport(msg, destport);
551                         msg_set_destport(msg, msg_origport(msg));
552                 }
553                 if (p_ptr)
554                         tipc_port_unlock(p_ptr);
555                 goto exit;
556         }
557
558         /* Process protocol message sent by peer */
559         switch (msg_type(msg)) {
560         case CONN_ACK:
561                 wakeable = tipc_port_congested(p_ptr) && p_ptr->congested &&
562                         p_ptr->wakeup;
563                 p_ptr->acked += msg_msgcnt(msg);
564                 if (!tipc_port_congested(p_ptr)) {
565                         p_ptr->congested = 0;
566                         if (wakeable)
567                                 p_ptr->wakeup(p_ptr);
568                 }
569                 break;
570         case CONN_PROBE:
571                 r_buf = port_build_proto_msg(p_ptr, CONN_PROBE_REPLY, 0);
572                 break;
573         default:
574                 /* CONN_PROBE_REPLY or unrecognized - no action required */
575                 break;
576         }
577         p_ptr->probing_state = CONFIRMED;
578         tipc_port_unlock(p_ptr);
579 exit:
580         tipc_net_route_msg(r_buf);
581         kfree_skb(buf);
582 }
583
584 static int port_print(struct tipc_port *p_ptr, char *buf, int len, int full_id)
585 {
586         struct publication *publ;
587         int ret;
588
589         if (full_id)
590                 ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:",
591                                     tipc_zone(tipc_own_addr),
592                                     tipc_cluster(tipc_own_addr),
593                                     tipc_node(tipc_own_addr), p_ptr->ref);
594         else
595                 ret = tipc_snprintf(buf, len, "%-10u:", p_ptr->ref);
596
597         if (p_ptr->connected) {
598                 u32 dport = port_peerport(p_ptr);
599                 u32 destnode = port_peernode(p_ptr);
600
601                 ret += tipc_snprintf(buf + ret, len - ret,
602                                      " connected to <%u.%u.%u:%u>",
603                                      tipc_zone(destnode),
604                                      tipc_cluster(destnode),
605                                      tipc_node(destnode), dport);
606                 if (p_ptr->conn_type != 0)
607                         ret += tipc_snprintf(buf + ret, len - ret,
608                                              " via {%u,%u}", p_ptr->conn_type,
609                                              p_ptr->conn_instance);
610         } else if (p_ptr->published) {
611                 ret += tipc_snprintf(buf + ret, len - ret, " bound to");
612                 list_for_each_entry(publ, &p_ptr->publications, pport_list) {
613                         if (publ->lower == publ->upper)
614                                 ret += tipc_snprintf(buf + ret, len - ret,
615                                                      " {%u,%u}", publ->type,
616                                                      publ->lower);
617                         else
618                                 ret += tipc_snprintf(buf + ret, len - ret,
619                                                      " {%u,%u,%u}", publ->type,
620                                                      publ->lower, publ->upper);
621                 }
622         }
623         ret += tipc_snprintf(buf + ret, len - ret, "\n");
624         return ret;
625 }
626
627 struct sk_buff *tipc_port_get_ports(void)
628 {
629         struct sk_buff *buf;
630         struct tlv_desc *rep_tlv;
631         char *pb;
632         int pb_len;
633         struct tipc_port *p_ptr;
634         int str_len = 0;
635
636         buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
637         if (!buf)
638                 return NULL;
639         rep_tlv = (struct tlv_desc *)buf->data;
640         pb = TLV_DATA(rep_tlv);
641         pb_len = ULTRA_STRING_MAX_LEN;
642
643         spin_lock_bh(&tipc_port_list_lock);
644         list_for_each_entry(p_ptr, &ports, port_list) {
645                 spin_lock_bh(p_ptr->lock);
646                 str_len += port_print(p_ptr, pb, pb_len, 0);
647                 spin_unlock_bh(p_ptr->lock);
648         }
649         spin_unlock_bh(&tipc_port_list_lock);
650         str_len += 1;   /* for "\0" */
651         skb_put(buf, TLV_SPACE(str_len));
652         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
653
654         return buf;
655 }
656
657 void tipc_port_reinit(void)
658 {
659         struct tipc_port *p_ptr;
660         struct tipc_msg *msg;
661
662         spin_lock_bh(&tipc_port_list_lock);
663         list_for_each_entry(p_ptr, &ports, port_list) {
664                 msg = &p_ptr->phdr;
665                 msg_set_prevnode(msg, tipc_own_addr);
666                 msg_set_orignode(msg, tipc_own_addr);
667         }
668         spin_unlock_bh(&tipc_port_list_lock);
669 }
670
671
672 /*
673  *  port_dispatcher_sigh(): Signal handler for messages destinated
674  *                          to the tipc_port interface.
675  */
676 static void port_dispatcher_sigh(void *dummy)
677 {
678         struct sk_buff *buf;
679
680         spin_lock_bh(&queue_lock);
681         buf = msg_queue_head;
682         msg_queue_head = NULL;
683         spin_unlock_bh(&queue_lock);
684
685         while (buf) {
686                 struct tipc_port *p_ptr;
687                 struct user_port *up_ptr;
688                 struct tipc_portid orig;
689                 struct tipc_name_seq dseq;
690                 void *usr_handle;
691                 int connected;
692                 int peer_invalid;
693                 int published;
694                 u32 message_type;
695
696                 struct sk_buff *next = buf->next;
697                 struct tipc_msg *msg = buf_msg(buf);
698                 u32 dref = msg_destport(msg);
699
700                 message_type = msg_type(msg);
701                 if (message_type > TIPC_DIRECT_MSG)
702                         goto reject;    /* Unsupported message type */
703
704                 p_ptr = tipc_port_lock(dref);
705                 if (!p_ptr)
706                         goto reject;    /* Port deleted while msg in queue */
707
708                 orig.ref = msg_origport(msg);
709                 orig.node = msg_orignode(msg);
710                 up_ptr = p_ptr->user_port;
711                 usr_handle = up_ptr->usr_handle;
712                 connected = p_ptr->connected;
713                 peer_invalid = connected && !tipc_port_peer_msg(p_ptr, msg);
714                 published = p_ptr->published;
715
716                 if (unlikely(msg_errcode(msg)))
717                         goto err;
718
719                 switch (message_type) {
720
721                 case TIPC_CONN_MSG:{
722                                 tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
723                                 u32 dsz;
724
725                                 tipc_port_unlock(p_ptr);
726                                 if (unlikely(!cb))
727                                         goto reject;
728                                 if (unlikely(!connected)) {
729                                         if (tipc_connect2port(dref, &orig))
730                                                 goto reject;
731                                 } else if (peer_invalid)
732                                         goto reject;
733                                 dsz = msg_data_sz(msg);
734                                 if (unlikely(dsz &&
735                                              (++p_ptr->conn_unacked >=
736                                               TIPC_FLOW_CONTROL_WIN)))
737                                         tipc_acknowledge(dref,
738                                                          p_ptr->conn_unacked);
739                                 skb_pull(buf, msg_hdr_sz(msg));
740                                 cb(usr_handle, dref, &buf, msg_data(msg), dsz);
741                                 break;
742                         }
743                 case TIPC_DIRECT_MSG:{
744                                 tipc_msg_event cb = up_ptr->msg_cb;
745
746                                 tipc_port_unlock(p_ptr);
747                                 if (unlikely(!cb || connected))
748                                         goto reject;
749                                 skb_pull(buf, msg_hdr_sz(msg));
750                                 cb(usr_handle, dref, &buf, msg_data(msg),
751                                    msg_data_sz(msg), msg_importance(msg),
752                                    &orig);
753                                 break;
754                         }
755                 case TIPC_MCAST_MSG:
756                 case TIPC_NAMED_MSG:{
757                                 tipc_named_msg_event cb = up_ptr->named_msg_cb;
758
759                                 tipc_port_unlock(p_ptr);
760                                 if (unlikely(!cb || connected || !published))
761                                         goto reject;
762                                 dseq.type =  msg_nametype(msg);
763                                 dseq.lower = msg_nameinst(msg);
764                                 dseq.upper = (message_type == TIPC_NAMED_MSG)
765                                         ? dseq.lower : msg_nameupper(msg);
766                                 skb_pull(buf, msg_hdr_sz(msg));
767                                 cb(usr_handle, dref, &buf, msg_data(msg),
768                                    msg_data_sz(msg), msg_importance(msg),
769                                    &orig, &dseq);
770                                 break;
771                         }
772                 }
773                 if (buf)
774                         kfree_skb(buf);
775                 buf = next;
776                 continue;
777 err:
778                 switch (message_type) {
779
780                 case TIPC_CONN_MSG:{
781                                 tipc_conn_shutdown_event cb =
782                                         up_ptr->conn_err_cb;
783
784                                 tipc_port_unlock(p_ptr);
785                                 if (!cb || !connected || peer_invalid)
786                                         break;
787                                 tipc_disconnect(dref);
788                                 skb_pull(buf, msg_hdr_sz(msg));
789                                 cb(usr_handle, dref, &buf, msg_data(msg),
790                                    msg_data_sz(msg), msg_errcode(msg));
791                                 break;
792                         }
793                 case TIPC_DIRECT_MSG:{
794                                 tipc_msg_err_event cb = up_ptr->err_cb;
795
796                                 tipc_port_unlock(p_ptr);
797                                 if (!cb || connected)
798                                         break;
799                                 skb_pull(buf, msg_hdr_sz(msg));
800                                 cb(usr_handle, dref, &buf, msg_data(msg),
801                                    msg_data_sz(msg), msg_errcode(msg), &orig);
802                                 break;
803                         }
804                 case TIPC_MCAST_MSG:
805                 case TIPC_NAMED_MSG:{
806                                 tipc_named_msg_err_event cb =
807                                         up_ptr->named_err_cb;
808
809                                 tipc_port_unlock(p_ptr);
810                                 if (!cb || connected)
811                                         break;
812                                 dseq.type =  msg_nametype(msg);
813                                 dseq.lower = msg_nameinst(msg);
814                                 dseq.upper = (message_type == TIPC_NAMED_MSG)
815                                         ? dseq.lower : msg_nameupper(msg);
816                                 skb_pull(buf, msg_hdr_sz(msg));
817                                 cb(usr_handle, dref, &buf, msg_data(msg),
818                                    msg_data_sz(msg), msg_errcode(msg), &dseq);
819                                 break;
820                         }
821                 }
822                 if (buf)
823                         kfree_skb(buf);
824                 buf = next;
825                 continue;
826 reject:
827                 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
828                 buf = next;
829         }
830 }
831
832 /*
833  *  port_dispatcher(): Dispatcher for messages destinated
834  *  to the tipc_port interface. Called with port locked.
835  */
836 static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
837 {
838         buf->next = NULL;
839         spin_lock_bh(&queue_lock);
840         if (msg_queue_head) {
841                 msg_queue_tail->next = buf;
842                 msg_queue_tail = buf;
843         } else {
844                 msg_queue_tail = msg_queue_head = buf;
845                 tipc_k_signal((Handler)port_dispatcher_sigh, 0);
846         }
847         spin_unlock_bh(&queue_lock);
848         return 0;
849 }
850
851 /*
852  * Wake up port after congestion: Called with port locked
853  */
854 static void port_wakeup_sh(unsigned long ref)
855 {
856         struct tipc_port *p_ptr;
857         struct user_port *up_ptr;
858         tipc_continue_event cb = NULL;
859         void *uh = NULL;
860
861         p_ptr = tipc_port_lock(ref);
862         if (p_ptr) {
863                 up_ptr = p_ptr->user_port;
864                 if (up_ptr) {
865                         cb = up_ptr->continue_event_cb;
866                         uh = up_ptr->usr_handle;
867                 }
868                 tipc_port_unlock(p_ptr);
869         }
870         if (cb)
871                 cb(uh, ref);
872 }
873
874
875 static void port_wakeup(struct tipc_port *p_ptr)
876 {
877         tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref);
878 }
879
880 void tipc_acknowledge(u32 ref, u32 ack)
881 {
882         struct tipc_port *p_ptr;
883         struct sk_buff *buf = NULL;
884
885         p_ptr = tipc_port_lock(ref);
886         if (!p_ptr)
887                 return;
888         if (p_ptr->connected) {
889                 p_ptr->conn_unacked -= ack;
890                 buf = port_build_proto_msg(p_ptr, CONN_ACK, ack);
891         }
892         tipc_port_unlock(p_ptr);
893         tipc_net_route_msg(buf);
894 }
895
896 /*
897  * tipc_createport(): user level call.
898  */
899 int tipc_createport(void *usr_handle,
900                     unsigned int importance,
901                     tipc_msg_err_event error_cb,
902                     tipc_named_msg_err_event named_error_cb,
903                     tipc_conn_shutdown_event conn_error_cb,
904                     tipc_msg_event msg_cb,
905                     tipc_named_msg_event named_msg_cb,
906                     tipc_conn_msg_event conn_msg_cb,
907                     tipc_continue_event continue_event_cb, /* May be zero */
908                     u32 *portref)
909 {
910         struct user_port *up_ptr;
911         struct tipc_port *p_ptr;
912
913         up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
914         if (!up_ptr) {
915                 pr_warn("Port creation failed, no memory\n");
916                 return -ENOMEM;
917         }
918         p_ptr = tipc_createport_raw(NULL, port_dispatcher, port_wakeup,
919                                     importance);
920         if (!p_ptr) {
921                 kfree(up_ptr);
922                 return -ENOMEM;
923         }
924
925         p_ptr->user_port = up_ptr;
926         up_ptr->usr_handle = usr_handle;
927         up_ptr->ref = p_ptr->ref;
928         up_ptr->err_cb = error_cb;
929         up_ptr->named_err_cb = named_error_cb;
930         up_ptr->conn_err_cb = conn_error_cb;
931         up_ptr->msg_cb = msg_cb;
932         up_ptr->named_msg_cb = named_msg_cb;
933         up_ptr->conn_msg_cb = conn_msg_cb;
934         up_ptr->continue_event_cb = continue_event_cb;
935         *portref = p_ptr->ref;
936         tipc_port_unlock(p_ptr);
937         return 0;
938 }
939
940 int tipc_portimportance(u32 ref, unsigned int *importance)
941 {
942         struct tipc_port *p_ptr;
943
944         p_ptr = tipc_port_lock(ref);
945         if (!p_ptr)
946                 return -EINVAL;
947         *importance = (unsigned int)msg_importance(&p_ptr->phdr);
948         tipc_port_unlock(p_ptr);
949         return 0;
950 }
951
952 int tipc_set_portimportance(u32 ref, unsigned int imp)
953 {
954         struct tipc_port *p_ptr;
955
956         if (imp > TIPC_CRITICAL_IMPORTANCE)
957                 return -EINVAL;
958
959         p_ptr = tipc_port_lock(ref);
960         if (!p_ptr)
961                 return -EINVAL;
962         msg_set_importance(&p_ptr->phdr, (u32)imp);
963         tipc_port_unlock(p_ptr);
964         return 0;
965 }
966
967
968 int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
969 {
970         struct tipc_port *p_ptr;
971         struct publication *publ;
972         u32 key;
973         int res = -EINVAL;
974
975         p_ptr = tipc_port_lock(ref);
976         if (!p_ptr)
977                 return -EINVAL;
978
979         if (p_ptr->connected)
980                 goto exit;
981         key = ref + p_ptr->pub_count + 1;
982         if (key == ref) {
983                 res = -EADDRINUSE;
984                 goto exit;
985         }
986         publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
987                                     scope, p_ptr->ref, key);
988         if (publ) {
989                 list_add(&publ->pport_list, &p_ptr->publications);
990                 p_ptr->pub_count++;
991                 p_ptr->published = 1;
992                 res = 0;
993         }
994 exit:
995         tipc_port_unlock(p_ptr);
996         return res;
997 }
998
999 int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1000 {
1001         struct tipc_port *p_ptr;
1002         struct publication *publ;
1003         struct publication *tpubl;
1004         int res = -EINVAL;
1005
1006         p_ptr = tipc_port_lock(ref);
1007         if (!p_ptr)
1008                 return -EINVAL;
1009         if (!seq) {
1010                 list_for_each_entry_safe(publ, tpubl,
1011                                          &p_ptr->publications, pport_list) {
1012                         tipc_nametbl_withdraw(publ->type, publ->lower,
1013                                               publ->ref, publ->key);
1014                 }
1015                 res = 0;
1016         } else {
1017                 list_for_each_entry_safe(publ, tpubl,
1018                                          &p_ptr->publications, pport_list) {
1019                         if (publ->scope != scope)
1020                                 continue;
1021                         if (publ->type != seq->type)
1022                                 continue;
1023                         if (publ->lower != seq->lower)
1024                                 continue;
1025                         if (publ->upper != seq->upper)
1026                                 break;
1027                         tipc_nametbl_withdraw(publ->type, publ->lower,
1028                                               publ->ref, publ->key);
1029                         res = 0;
1030                         break;
1031                 }
1032         }
1033         if (list_empty(&p_ptr->publications))
1034                 p_ptr->published = 0;
1035         tipc_port_unlock(p_ptr);
1036         return res;
1037 }
1038
1039 int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1040 {
1041         struct tipc_port *p_ptr;
1042         struct tipc_msg *msg;
1043         int res = -EINVAL;
1044
1045         p_ptr = tipc_port_lock(ref);
1046         if (!p_ptr)
1047                 return -EINVAL;
1048         if (p_ptr->published || p_ptr->connected)
1049                 goto exit;
1050         if (!peer->ref)
1051                 goto exit;
1052
1053         msg = &p_ptr->phdr;
1054         msg_set_destnode(msg, peer->node);
1055         msg_set_destport(msg, peer->ref);
1056         msg_set_type(msg, TIPC_CONN_MSG);
1057         msg_set_lookup_scope(msg, 0);
1058         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1059
1060         p_ptr->probing_interval = PROBING_INTERVAL;
1061         p_ptr->probing_state = CONFIRMED;
1062         p_ptr->connected = 1;
1063         k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
1064
1065         tipc_nodesub_subscribe(&p_ptr->subscription, peer->node,
1066                           (void *)(unsigned long)ref,
1067                           (net_ev_handler)port_handle_node_down);
1068         res = 0;
1069 exit:
1070         tipc_port_unlock(p_ptr);
1071         p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);
1072         return res;
1073 }
1074
1075 /**
1076  * tipc_disconnect_port - disconnect port from peer
1077  *
1078  * Port must be locked.
1079  */
1080 int tipc_disconnect_port(struct tipc_port *tp_ptr)
1081 {
1082         int res;
1083
1084         if (tp_ptr->connected) {
1085                 tp_ptr->connected = 0;
1086                 /* let timer expire on it's own to avoid deadlock! */
1087                 tipc_nodesub_unsubscribe(&tp_ptr->subscription);
1088                 res = 0;
1089         } else {
1090                 res = -ENOTCONN;
1091         }
1092         return res;
1093 }
1094
1095 /*
1096  * tipc_disconnect(): Disconnect port form peer.
1097  *                    This is a node local operation.
1098  */
1099 int tipc_disconnect(u32 ref)
1100 {
1101         struct tipc_port *p_ptr;
1102         int res;
1103
1104         p_ptr = tipc_port_lock(ref);
1105         if (!p_ptr)
1106                 return -EINVAL;
1107         res = tipc_disconnect_port(p_ptr);
1108         tipc_port_unlock(p_ptr);
1109         return res;
1110 }
1111
1112 /*
1113  * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect
1114  */
1115 int tipc_shutdown(u32 ref)
1116 {
1117         struct tipc_port *p_ptr;
1118         struct sk_buff *buf = NULL;
1119
1120         p_ptr = tipc_port_lock(ref);
1121         if (!p_ptr)
1122                 return -EINVAL;
1123
1124         buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN);
1125         tipc_port_unlock(p_ptr);
1126         tipc_net_route_msg(buf);
1127         return tipc_disconnect(ref);
1128 }
1129
1130 /**
1131  * tipc_port_recv_msg - receive message from lower layer and deliver to port user
1132  */
1133 int tipc_port_recv_msg(struct sk_buff *buf)
1134 {
1135         struct tipc_port *p_ptr;
1136         struct tipc_msg *msg = buf_msg(buf);
1137         u32 destport = msg_destport(msg);
1138         u32 dsz = msg_data_sz(msg);
1139         u32 err;
1140
1141         /* forward unresolved named message */
1142         if (unlikely(!destport)) {
1143                 tipc_net_route_msg(buf);
1144                 return dsz;
1145         }
1146
1147         /* validate destination & pass to port, otherwise reject message */
1148         p_ptr = tipc_port_lock(destport);
1149         if (likely(p_ptr)) {
1150                 err = p_ptr->dispatcher(p_ptr, buf);
1151                 tipc_port_unlock(p_ptr);
1152                 if (likely(!err))
1153                         return dsz;
1154         } else {
1155                 err = TIPC_ERR_NO_PORT;
1156         }
1157
1158         return tipc_reject_msg(buf, err);
1159 }
1160
1161 /*
1162  *  tipc_port_recv_sections(): Concatenate and deliver sectioned
1163  *                        message for this node.
1164  */
1165 static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_sect,
1166                                    struct iovec const *msg_sect,
1167                                    unsigned int total_len)
1168 {
1169         struct sk_buff *buf;
1170         int res;
1171
1172         res = tipc_msg_build(&sender->phdr, msg_sect, num_sect, total_len,
1173                         MAX_MSG_SIZE, !sender->user_port, &buf);
1174         if (likely(buf))
1175                 tipc_port_recv_msg(buf);
1176         return res;
1177 }
1178
1179 /**
1180  * tipc_send - send message sections on connection
1181  */
1182 int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect,
1183               unsigned int total_len)
1184 {
1185         struct tipc_port *p_ptr;
1186         u32 destnode;
1187         int res;
1188
1189         p_ptr = tipc_port_deref(ref);
1190         if (!p_ptr || !p_ptr->connected)
1191                 return -EINVAL;
1192
1193         p_ptr->congested = 1;
1194         if (!tipc_port_congested(p_ptr)) {
1195                 destnode = port_peernode(p_ptr);
1196                 if (likely(!in_own_node(destnode)))
1197                         res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1198                                                            total_len, destnode);
1199                 else
1200                         res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
1201                                                       total_len);
1202
1203                 if (likely(res != -ELINKCONG)) {
1204                         p_ptr->congested = 0;
1205                         if (res > 0)
1206                                 p_ptr->sent++;
1207                         return res;
1208                 }
1209         }
1210         if (port_unreliable(p_ptr)) {
1211                 p_ptr->congested = 0;
1212                 return total_len;
1213         }
1214         return -ELINKCONG;
1215 }
1216
1217 /**
1218  * tipc_send2name - send message sections to port name
1219  */
1220 int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
1221                    unsigned int num_sect, struct iovec const *msg_sect,
1222                    unsigned int total_len)
1223 {
1224         struct tipc_port *p_ptr;
1225         struct tipc_msg *msg;
1226         u32 destnode = domain;
1227         u32 destport;
1228         int res;
1229
1230         p_ptr = tipc_port_deref(ref);
1231         if (!p_ptr || p_ptr->connected)
1232                 return -EINVAL;
1233
1234         msg = &p_ptr->phdr;
1235         msg_set_type(msg, TIPC_NAMED_MSG);
1236         msg_set_hdr_sz(msg, NAMED_H_SIZE);
1237         msg_set_nametype(msg, name->type);
1238         msg_set_nameinst(msg, name->instance);
1239         msg_set_lookup_scope(msg, tipc_addr_scope(domain));
1240         destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
1241         msg_set_destnode(msg, destnode);
1242         msg_set_destport(msg, destport);
1243
1244         if (likely(destport || destnode)) {
1245                 if (likely(in_own_node(destnode)))
1246                         res = tipc_port_recv_sections(p_ptr, num_sect,
1247                                                       msg_sect, total_len);
1248                 else if (tipc_own_addr)
1249                         res = tipc_link_send_sections_fast(p_ptr, msg_sect,
1250                                                            num_sect, total_len,
1251                                                            destnode);
1252                 else
1253                         res = tipc_port_reject_sections(p_ptr, msg, msg_sect,
1254                                                         num_sect, total_len,
1255                                                         TIPC_ERR_NO_NODE);
1256                 if (likely(res != -ELINKCONG)) {
1257                         if (res > 0)
1258                                 p_ptr->sent++;
1259                         return res;
1260                 }
1261                 if (port_unreliable(p_ptr)) {
1262                         return total_len;
1263                 }
1264                 return -ELINKCONG;
1265         }
1266         return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1267                                          total_len, TIPC_ERR_NO_NAME);
1268 }
1269
1270 /**
1271  * tipc_send2port - send message sections to port identity
1272  */
1273 int tipc_send2port(u32 ref, struct tipc_portid const *dest,
1274                    unsigned int num_sect, struct iovec const *msg_sect,
1275                    unsigned int total_len)
1276 {
1277         struct tipc_port *p_ptr;
1278         struct tipc_msg *msg;
1279         int res;
1280
1281         p_ptr = tipc_port_deref(ref);
1282         if (!p_ptr || p_ptr->connected)
1283                 return -EINVAL;
1284
1285         msg = &p_ptr->phdr;
1286         msg_set_type(msg, TIPC_DIRECT_MSG);
1287         msg_set_lookup_scope(msg, 0);
1288         msg_set_destnode(msg, dest->node);
1289         msg_set_destport(msg, dest->ref);
1290         msg_set_hdr_sz(msg, BASIC_H_SIZE);
1291
1292         if (in_own_node(dest->node))
1293                 res =  tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
1294                                                total_len);
1295         else if (tipc_own_addr)
1296                 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1297                                                    total_len, dest->node);
1298         else
1299                 res = tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1300                                                 total_len, TIPC_ERR_NO_NODE);
1301         if (likely(res != -ELINKCONG)) {
1302                 if (res > 0)
1303                         p_ptr->sent++;
1304                 return res;
1305         }
1306         if (port_unreliable(p_ptr)) {
1307                 return total_len;
1308         }
1309         return -ELINKCONG;
1310 }
1311
1312 /**
1313  * tipc_send_buf2port - send message buffer to port identity
1314  */
1315 int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest,
1316                struct sk_buff *buf, unsigned int dsz)
1317 {
1318         struct tipc_port *p_ptr;
1319         struct tipc_msg *msg;
1320         int res;
1321
1322         p_ptr = (struct tipc_port *)tipc_ref_deref(ref);
1323         if (!p_ptr || p_ptr->connected)
1324                 return -EINVAL;
1325
1326         msg = &p_ptr->phdr;
1327         msg_set_type(msg, TIPC_DIRECT_MSG);
1328         msg_set_destnode(msg, dest->node);
1329         msg_set_destport(msg, dest->ref);
1330         msg_set_hdr_sz(msg, BASIC_H_SIZE);
1331         msg_set_size(msg, BASIC_H_SIZE + dsz);
1332         if (skb_cow(buf, BASIC_H_SIZE))
1333                 return -ENOMEM;
1334
1335         skb_push(buf, BASIC_H_SIZE);
1336         skb_copy_to_linear_data(buf, msg, BASIC_H_SIZE);
1337
1338         if (in_own_node(dest->node))
1339                 res = tipc_port_recv_msg(buf);
1340         else
1341                 res = tipc_send_buf_fast(buf, dest->node);
1342         if (likely(res != -ELINKCONG)) {
1343                 if (res > 0)
1344                         p_ptr->sent++;
1345                 return res;
1346         }
1347         if (port_unreliable(p_ptr))
1348                 return dsz;
1349         return -ELINKCONG;
1350 }