staging: lustre: assume a kernel build
[sfrench/cifs-2.6.git] / drivers / staging / lustre / lnet / lnet / lib-move.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/lnet/lib-move.c
37  *
38  * Data movement routines
39  */
40
41 #define DEBUG_SUBSYSTEM S_LNET
42
43 #include "../../include/linux/lnet/lib-lnet.h"
44
45 /** lnet message has credit and can be submitted to lnd for send/receive */
46 #define LNET_CREDIT_OK          0
47 /** lnet message is waiting for credit */
48 #define LNET_CREDIT_WAIT        1
49
50 static int local_nid_dist_zero = 1;
51 module_param(local_nid_dist_zero, int, 0444);
52 MODULE_PARM_DESC(local_nid_dist_zero, "Reserved");
53
54 int
55 lnet_fail_nid(lnet_nid_t nid, unsigned int threshold)
56 {
57         lnet_test_peer_t *tp;
58         struct list_head *el;
59         struct list_head *next;
60         struct list_head cull;
61
62         /* NB: use lnet_net_lock(0) to serialize operations on test peers */
63         if (threshold) {
64                 /* Adding a new entry */
65                 LIBCFS_ALLOC(tp, sizeof(*tp));
66                 if (!tp)
67                         return -ENOMEM;
68
69                 tp->tp_nid = nid;
70                 tp->tp_threshold = threshold;
71
72                 lnet_net_lock(0);
73                 list_add_tail(&tp->tp_list, &the_lnet.ln_test_peers);
74                 lnet_net_unlock(0);
75                 return 0;
76         }
77
78         /* removing entries */
79         INIT_LIST_HEAD(&cull);
80
81         lnet_net_lock(0);
82
83         list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
84                 tp = list_entry(el, lnet_test_peer_t, tp_list);
85
86                 if (!tp->tp_threshold ||    /* needs culling anyway */
87                     nid == LNET_NID_ANY ||       /* removing all entries */
88                     tp->tp_nid == nid) {          /* matched this one */
89                         list_del(&tp->tp_list);
90                         list_add(&tp->tp_list, &cull);
91                 }
92         }
93
94         lnet_net_unlock(0);
95
96         while (!list_empty(&cull)) {
97                 tp = list_entry(cull.next, lnet_test_peer_t, tp_list);
98
99                 list_del(&tp->tp_list);
100                 LIBCFS_FREE(tp, sizeof(*tp));
101         }
102         return 0;
103 }
104
105 static int
106 fail_peer(lnet_nid_t nid, int outgoing)
107 {
108         lnet_test_peer_t *tp;
109         struct list_head *el;
110         struct list_head *next;
111         struct list_head cull;
112         int fail = 0;
113
114         INIT_LIST_HEAD(&cull);
115
116         /* NB: use lnet_net_lock(0) to serialize operations on test peers */
117         lnet_net_lock(0);
118
119         list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
120                 tp = list_entry(el, lnet_test_peer_t, tp_list);
121
122                 if (!tp->tp_threshold) {
123                         /* zombie entry */
124                         if (outgoing) {
125                                 /*
126                                  * only cull zombies on outgoing tests,
127                                  * since we may be at interrupt priority on
128                                  * incoming messages.
129                                  */
130                                 list_del(&tp->tp_list);
131                                 list_add(&tp->tp_list, &cull);
132                         }
133                         continue;
134                 }
135
136                 if (tp->tp_nid == LNET_NID_ANY || /* fail every peer */
137                     nid == tp->tp_nid) {        /* fail this peer */
138                         fail = 1;
139
140                         if (tp->tp_threshold != LNET_MD_THRESH_INF) {
141                                 tp->tp_threshold--;
142                                 if (outgoing &&
143                                     !tp->tp_threshold) {
144                                         /* see above */
145                                         list_del(&tp->tp_list);
146                                         list_add(&tp->tp_list, &cull);
147                                 }
148                         }
149                         break;
150                 }
151         }
152
153         lnet_net_unlock(0);
154
155         while (!list_empty(&cull)) {
156                 tp = list_entry(cull.next, lnet_test_peer_t, tp_list);
157                 list_del(&tp->tp_list);
158
159                 LIBCFS_FREE(tp, sizeof(*tp));
160         }
161
162         return fail;
163 }
164
165 unsigned int
166 lnet_iov_nob(unsigned int niov, struct kvec *iov)
167 {
168         unsigned int nob = 0;
169
170         while (niov-- > 0)
171                 nob += (iov++)->iov_len;
172
173         return nob;
174 }
175 EXPORT_SYMBOL(lnet_iov_nob);
176
177 void
178 lnet_copy_iov2iov(unsigned int ndiov, struct kvec *diov, unsigned int doffset,
179                   unsigned int nsiov, struct kvec *siov, unsigned int soffset,
180                   unsigned int nob)
181 {
182         /* NB diov, siov are READ-ONLY */
183         unsigned int this_nob;
184
185         if (!nob)
186                 return;
187
188         /* skip complete frags before 'doffset' */
189         LASSERT(ndiov > 0);
190         while (doffset >= diov->iov_len) {
191                 doffset -= diov->iov_len;
192                 diov++;
193                 ndiov--;
194                 LASSERT(ndiov > 0);
195         }
196
197         /* skip complete frags before 'soffset' */
198         LASSERT(nsiov > 0);
199         while (soffset >= siov->iov_len) {
200                 soffset -= siov->iov_len;
201                 siov++;
202                 nsiov--;
203                 LASSERT(nsiov > 0);
204         }
205
206         do {
207                 LASSERT(ndiov > 0);
208                 LASSERT(nsiov > 0);
209                 this_nob = min(diov->iov_len - doffset,
210                                siov->iov_len - soffset);
211                 this_nob = min(this_nob, nob);
212
213                 memcpy((char *)diov->iov_base + doffset,
214                        (char *)siov->iov_base + soffset, this_nob);
215                 nob -= this_nob;
216
217                 if (diov->iov_len > doffset + this_nob) {
218                         doffset += this_nob;
219                 } else {
220                         diov++;
221                         ndiov--;
222                         doffset = 0;
223                 }
224
225                 if (siov->iov_len > soffset + this_nob) {
226                         soffset += this_nob;
227                 } else {
228                         siov++;
229                         nsiov--;
230                         soffset = 0;
231                 }
232         } while (nob > 0);
233 }
234 EXPORT_SYMBOL(lnet_copy_iov2iov);
235
236 int
237 lnet_extract_iov(int dst_niov, struct kvec *dst,
238                  int src_niov, struct kvec *src,
239                  unsigned int offset, unsigned int len)
240 {
241         /*
242          * Initialise 'dst' to the subset of 'src' starting at 'offset',
243          * for exactly 'len' bytes, and return the number of entries.
244          * NB not destructive to 'src'
245          */
246         unsigned int frag_len;
247         unsigned int niov;
248
249         if (!len)                          /* no data => */
250                 return 0;                    /* no frags */
251
252         LASSERT(src_niov > 0);
253         while (offset >= src->iov_len) {      /* skip initial frags */
254                 offset -= src->iov_len;
255                 src_niov--;
256                 src++;
257                 LASSERT(src_niov > 0);
258         }
259
260         niov = 1;
261         for (;;) {
262                 LASSERT(src_niov > 0);
263                 LASSERT((int)niov <= dst_niov);
264
265                 frag_len = src->iov_len - offset;
266                 dst->iov_base = ((char *)src->iov_base) + offset;
267
268                 if (len <= frag_len) {
269                         dst->iov_len = len;
270                         return niov;
271                 }
272
273                 dst->iov_len = frag_len;
274
275                 len -= frag_len;
276                 dst++;
277                 src++;
278                 niov++;
279                 src_niov--;
280                 offset = 0;
281         }
282 }
283 EXPORT_SYMBOL(lnet_extract_iov);
284
285 unsigned int
286 lnet_kiov_nob(unsigned int niov, lnet_kiov_t *kiov)
287 {
288         unsigned int nob = 0;
289
290         while (niov-- > 0)
291                 nob += (kiov++)->kiov_len;
292
293         return nob;
294 }
295 EXPORT_SYMBOL(lnet_kiov_nob);
296
297 void
298 lnet_copy_kiov2kiov(unsigned int ndiov, lnet_kiov_t *diov, unsigned int doffset,
299                     unsigned int nsiov, lnet_kiov_t *siov, unsigned int soffset,
300                     unsigned int nob)
301 {
302         /* NB diov, siov are READ-ONLY */
303         unsigned int this_nob;
304         char *daddr = NULL;
305         char *saddr = NULL;
306
307         if (!nob)
308                 return;
309
310         LASSERT(!in_interrupt());
311
312         LASSERT(ndiov > 0);
313         while (doffset >= diov->kiov_len) {
314                 doffset -= diov->kiov_len;
315                 diov++;
316                 ndiov--;
317                 LASSERT(ndiov > 0);
318         }
319
320         LASSERT(nsiov > 0);
321         while (soffset >= siov->kiov_len) {
322                 soffset -= siov->kiov_len;
323                 siov++;
324                 nsiov--;
325                 LASSERT(nsiov > 0);
326         }
327
328         do {
329                 LASSERT(ndiov > 0);
330                 LASSERT(nsiov > 0);
331                 this_nob = min(diov->kiov_len - doffset,
332                                siov->kiov_len - soffset);
333                 this_nob = min(this_nob, nob);
334
335                 if (!daddr)
336                         daddr = ((char *)kmap(diov->kiov_page)) +
337                                 diov->kiov_offset + doffset;
338                 if (!saddr)
339                         saddr = ((char *)kmap(siov->kiov_page)) +
340                                 siov->kiov_offset + soffset;
341
342                 /*
343                  * Vanishing risk of kmap deadlock when mapping 2 pages.
344                  * However in practice at least one of the kiovs will be mapped
345                  * kernel pages and the map/unmap will be NOOPs
346                  */
347                 memcpy(daddr, saddr, this_nob);
348                 nob -= this_nob;
349
350                 if (diov->kiov_len > doffset + this_nob) {
351                         daddr += this_nob;
352                         doffset += this_nob;
353                 } else {
354                         kunmap(diov->kiov_page);
355                         daddr = NULL;
356                         diov++;
357                         ndiov--;
358                         doffset = 0;
359                 }
360
361                 if (siov->kiov_len > soffset + this_nob) {
362                         saddr += this_nob;
363                         soffset += this_nob;
364                 } else {
365                         kunmap(siov->kiov_page);
366                         saddr = NULL;
367                         siov++;
368                         nsiov--;
369                         soffset = 0;
370                 }
371         } while (nob > 0);
372
373         if (daddr)
374                 kunmap(diov->kiov_page);
375         if (saddr)
376                 kunmap(siov->kiov_page);
377 }
378 EXPORT_SYMBOL(lnet_copy_kiov2kiov);
379
380 void
381 lnet_copy_kiov2iov(unsigned int niov, struct kvec *iov, unsigned int iovoffset,
382                    unsigned int nkiov, lnet_kiov_t *kiov,
383                    unsigned int kiovoffset, unsigned int nob)
384 {
385         /* NB iov, kiov are READ-ONLY */
386         unsigned int this_nob;
387         char *addr = NULL;
388
389         if (!nob)
390                 return;
391
392         LASSERT(!in_interrupt());
393
394         LASSERT(niov > 0);
395         while (iovoffset >= iov->iov_len) {
396                 iovoffset -= iov->iov_len;
397                 iov++;
398                 niov--;
399                 LASSERT(niov > 0);
400         }
401
402         LASSERT(nkiov > 0);
403         while (kiovoffset >= kiov->kiov_len) {
404                 kiovoffset -= kiov->kiov_len;
405                 kiov++;
406                 nkiov--;
407                 LASSERT(nkiov > 0);
408         }
409
410         do {
411                 LASSERT(niov > 0);
412                 LASSERT(nkiov > 0);
413                 this_nob = min(iov->iov_len - iovoffset,
414                                (__kernel_size_t) kiov->kiov_len - kiovoffset);
415                 this_nob = min(this_nob, nob);
416
417                 if (!addr)
418                         addr = ((char *)kmap(kiov->kiov_page)) +
419                                 kiov->kiov_offset + kiovoffset;
420
421                 memcpy((char *)iov->iov_base + iovoffset, addr, this_nob);
422                 nob -= this_nob;
423
424                 if (iov->iov_len > iovoffset + this_nob) {
425                         iovoffset += this_nob;
426                 } else {
427                         iov++;
428                         niov--;
429                         iovoffset = 0;
430                 }
431
432                 if (kiov->kiov_len > kiovoffset + this_nob) {
433                         addr += this_nob;
434                         kiovoffset += this_nob;
435                 } else {
436                         kunmap(kiov->kiov_page);
437                         addr = NULL;
438                         kiov++;
439                         nkiov--;
440                         kiovoffset = 0;
441                 }
442
443         } while (nob > 0);
444
445         if (addr)
446                 kunmap(kiov->kiov_page);
447 }
448 EXPORT_SYMBOL(lnet_copy_kiov2iov);
449
450 void
451 lnet_copy_iov2kiov(unsigned int nkiov, lnet_kiov_t *kiov,
452                    unsigned int kiovoffset, unsigned int niov,
453                    struct kvec *iov, unsigned int iovoffset,
454                    unsigned int nob)
455 {
456         /* NB kiov, iov are READ-ONLY */
457         unsigned int this_nob;
458         char *addr = NULL;
459
460         if (!nob)
461                 return;
462
463         LASSERT(!in_interrupt());
464
465         LASSERT(nkiov > 0);
466         while (kiovoffset >= kiov->kiov_len) {
467                 kiovoffset -= kiov->kiov_len;
468                 kiov++;
469                 nkiov--;
470                 LASSERT(nkiov > 0);
471         }
472
473         LASSERT(niov > 0);
474         while (iovoffset >= iov->iov_len) {
475                 iovoffset -= iov->iov_len;
476                 iov++;
477                 niov--;
478                 LASSERT(niov > 0);
479         }
480
481         do {
482                 LASSERT(nkiov > 0);
483                 LASSERT(niov > 0);
484                 this_nob = min((__kernel_size_t) kiov->kiov_len - kiovoffset,
485                                iov->iov_len - iovoffset);
486                 this_nob = min(this_nob, nob);
487
488                 if (!addr)
489                         addr = ((char *)kmap(kiov->kiov_page)) +
490                                 kiov->kiov_offset + kiovoffset;
491
492                 memcpy(addr, (char *)iov->iov_base + iovoffset, this_nob);
493                 nob -= this_nob;
494
495                 if (kiov->kiov_len > kiovoffset + this_nob) {
496                         addr += this_nob;
497                         kiovoffset += this_nob;
498                 } else {
499                         kunmap(kiov->kiov_page);
500                         addr = NULL;
501                         kiov++;
502                         nkiov--;
503                         kiovoffset = 0;
504                 }
505
506                 if (iov->iov_len > iovoffset + this_nob) {
507                         iovoffset += this_nob;
508                 } else {
509                         iov++;
510                         niov--;
511                         iovoffset = 0;
512                 }
513         } while (nob > 0);
514
515         if (addr)
516                 kunmap(kiov->kiov_page);
517 }
518 EXPORT_SYMBOL(lnet_copy_iov2kiov);
519
520 int
521 lnet_extract_kiov(int dst_niov, lnet_kiov_t *dst,
522                   int src_niov, lnet_kiov_t *src,
523                   unsigned int offset, unsigned int len)
524 {
525         /*
526          * Initialise 'dst' to the subset of 'src' starting at 'offset',
527          * for exactly 'len' bytes, and return the number of entries.
528          * NB not destructive to 'src'
529          */
530         unsigned int frag_len;
531         unsigned int niov;
532
533         if (!len)                          /* no data => */
534                 return 0;                    /* no frags */
535
536         LASSERT(src_niov > 0);
537         while (offset >= src->kiov_len) {      /* skip initial frags */
538                 offset -= src->kiov_len;
539                 src_niov--;
540                 src++;
541                 LASSERT(src_niov > 0);
542         }
543
544         niov = 1;
545         for (;;) {
546                 LASSERT(src_niov > 0);
547                 LASSERT((int)niov <= dst_niov);
548
549                 frag_len = src->kiov_len - offset;
550                 dst->kiov_page = src->kiov_page;
551                 dst->kiov_offset = src->kiov_offset + offset;
552
553                 if (len <= frag_len) {
554                         dst->kiov_len = len;
555                         LASSERT(dst->kiov_offset + dst->kiov_len
556                                         <= PAGE_CACHE_SIZE);
557                         return niov;
558                 }
559
560                 dst->kiov_len = frag_len;
561                 LASSERT(dst->kiov_offset + dst->kiov_len <= PAGE_CACHE_SIZE);
562
563                 len -= frag_len;
564                 dst++;
565                 src++;
566                 niov++;
567                 src_niov--;
568                 offset = 0;
569         }
570 }
571 EXPORT_SYMBOL(lnet_extract_kiov);
572
573 static void
574 lnet_ni_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
575              unsigned int offset, unsigned int mlen, unsigned int rlen)
576 {
577         unsigned int niov = 0;
578         struct kvec *iov = NULL;
579         lnet_kiov_t *kiov = NULL;
580         int rc;
581
582         LASSERT(!in_interrupt());
583         LASSERT(!mlen || msg);
584
585         if (msg) {
586                 LASSERT(msg->msg_receiving);
587                 LASSERT(!msg->msg_sending);
588                 LASSERT(rlen == msg->msg_len);
589                 LASSERT(mlen <= msg->msg_len);
590                 LASSERT(msg->msg_offset == offset);
591                 LASSERT(msg->msg_wanted == mlen);
592
593                 msg->msg_receiving = 0;
594
595                 if (mlen) {
596                         niov = msg->msg_niov;
597                         iov  = msg->msg_iov;
598                         kiov = msg->msg_kiov;
599
600                         LASSERT(niov > 0);
601                         LASSERT(!iov != !kiov);
602                 }
603         }
604
605         rc = ni->ni_lnd->lnd_recv(ni, private, msg, delayed,
606                                   niov, iov, kiov, offset, mlen, rlen);
607         if (rc < 0)
608                 lnet_finalize(ni, msg, rc);
609 }
610
611 static void
612 lnet_setpayloadbuffer(lnet_msg_t *msg)
613 {
614         lnet_libmd_t *md = msg->msg_md;
615
616         LASSERT(msg->msg_len > 0);
617         LASSERT(!msg->msg_routing);
618         LASSERT(md);
619         LASSERT(!msg->msg_niov);
620         LASSERT(!msg->msg_iov);
621         LASSERT(!msg->msg_kiov);
622
623         msg->msg_niov = md->md_niov;
624         if (md->md_options & LNET_MD_KIOV)
625                 msg->msg_kiov = md->md_iov.kiov;
626         else
627                 msg->msg_iov = md->md_iov.iov;
628 }
629
630 void
631 lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
632                unsigned int offset, unsigned int len)
633 {
634         msg->msg_type = type;
635         msg->msg_target = target;
636         msg->msg_len = len;
637         msg->msg_offset = offset;
638
639         if (len)
640                 lnet_setpayloadbuffer(msg);
641
642         memset(&msg->msg_hdr, 0, sizeof(msg->msg_hdr));
643         msg->msg_hdr.type          = cpu_to_le32(type);
644         msg->msg_hdr.dest_nid       = cpu_to_le64(target.nid);
645         msg->msg_hdr.dest_pid       = cpu_to_le32(target.pid);
646         /* src_nid will be set later */
647         msg->msg_hdr.src_pid    = cpu_to_le32(the_lnet.ln_pid);
648         msg->msg_hdr.payload_length = cpu_to_le32(len);
649 }
650
651 static void
652 lnet_ni_send(lnet_ni_t *ni, lnet_msg_t *msg)
653 {
654         void *priv = msg->msg_private;
655         int rc;
656
657         LASSERT(!in_interrupt());
658         LASSERT(LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND ||
659                 (msg->msg_txcredit && msg->msg_peertxcredit));
660
661         rc = ni->ni_lnd->lnd_send(ni, priv, msg);
662         if (rc < 0)
663                 lnet_finalize(ni, msg, rc);
664 }
665
666 static int
667 lnet_ni_eager_recv(lnet_ni_t *ni, lnet_msg_t *msg)
668 {
669         int rc;
670
671         LASSERT(!msg->msg_sending);
672         LASSERT(msg->msg_receiving);
673         LASSERT(!msg->msg_rx_ready_delay);
674         LASSERT(ni->ni_lnd->lnd_eager_recv);
675
676         msg->msg_rx_ready_delay = 1;
677         rc = ni->ni_lnd->lnd_eager_recv(ni, msg->msg_private, msg,
678                                         &msg->msg_private);
679         if (rc) {
680                 CERROR("recv from %s / send to %s aborted: eager_recv failed %d\n",
681                        libcfs_nid2str(msg->msg_rxpeer->lp_nid),
682                        libcfs_id2str(msg->msg_target), rc);
683                 LASSERT(rc < 0); /* required by my callers */
684         }
685
686         return rc;
687 }
688
689 /* NB: caller shall hold a ref on 'lp' as I'd drop lnet_net_lock */
690 static void
691 lnet_ni_query_locked(lnet_ni_t *ni, lnet_peer_t *lp)
692 {
693         unsigned long last_alive = 0;
694
695         LASSERT(lnet_peer_aliveness_enabled(lp));
696         LASSERT(ni->ni_lnd->lnd_query);
697
698         lnet_net_unlock(lp->lp_cpt);
699         ni->ni_lnd->lnd_query(ni, lp->lp_nid, &last_alive);
700         lnet_net_lock(lp->lp_cpt);
701
702         lp->lp_last_query = cfs_time_current();
703
704         if (last_alive) /* NI has updated timestamp */
705                 lp->lp_last_alive = last_alive;
706 }
707
708 /* NB: always called with lnet_net_lock held */
709 static inline int
710 lnet_peer_is_alive(lnet_peer_t *lp, unsigned long now)
711 {
712         int alive;
713         unsigned long deadline;
714
715         LASSERT(lnet_peer_aliveness_enabled(lp));
716
717         /* Trust lnet_notify() if it has more recent aliveness news, but
718          * ignore the initial assumed death (see lnet_peers_start_down()).
719          */
720         if (!lp->lp_alive && lp->lp_alive_count > 0 &&
721             cfs_time_aftereq(lp->lp_timestamp, lp->lp_last_alive))
722                 return 0;
723
724         deadline = cfs_time_add(lp->lp_last_alive,
725                                 cfs_time_seconds(lp->lp_ni->ni_peertimeout));
726         alive = cfs_time_after(deadline, now);
727
728         /* Update obsolete lp_alive except for routers assumed to be dead
729          * initially, because router checker would update aliveness in this
730          * case, and moreover lp_last_alive at peer creation is assumed.
731          */
732         if (alive && !lp->lp_alive &&
733             !(lnet_isrouter(lp) && !lp->lp_alive_count))
734                 lnet_notify_locked(lp, 0, 1, lp->lp_last_alive);
735
736         return alive;
737 }
738
739 /*
740  * NB: returns 1 when alive, 0 when dead, negative when error;
741  *     may drop the lnet_net_lock
742  */
743 static int
744 lnet_peer_alive_locked(lnet_peer_t *lp)
745 {
746         unsigned long now = cfs_time_current();
747
748         if (!lnet_peer_aliveness_enabled(lp))
749                 return -ENODEV;
750
751         if (lnet_peer_is_alive(lp, now))
752                 return 1;
753
754         /*
755          * Peer appears dead, but we should avoid frequent NI queries (at
756          * most once per lnet_queryinterval seconds).
757          */
758         if (lp->lp_last_query) {
759                 static const int lnet_queryinterval = 1;
760
761                 unsigned long next_query =
762                            cfs_time_add(lp->lp_last_query,
763                                         cfs_time_seconds(lnet_queryinterval));
764
765                 if (time_before(now, next_query)) {
766                         if (lp->lp_alive)
767                                 CWARN("Unexpected aliveness of peer %s: %d < %d (%d/%d)\n",
768                                       libcfs_nid2str(lp->lp_nid),
769                                       (int)now, (int)next_query,
770                                       lnet_queryinterval,
771                                       lp->lp_ni->ni_peertimeout);
772                         return 0;
773                 }
774         }
775
776         /* query NI for latest aliveness news */
777         lnet_ni_query_locked(lp->lp_ni, lp);
778
779         if (lnet_peer_is_alive(lp, now))
780                 return 1;
781
782         lnet_notify_locked(lp, 0, 0, lp->lp_last_alive);
783         return 0;
784 }
785
786 /**
787  * \param msg The message to be sent.
788  * \param do_send True if lnet_ni_send() should be called in this function.
789  *        lnet_send() is going to lnet_net_unlock immediately after this, so
790  *        it sets do_send FALSE and I don't do the unlock/send/lock bit.
791  *
792  * \retval LNET_CREDIT_OK If \a msg sent or OK to send.
793  * \retval LNET_CREDIT_WAIT If \a msg blocked for credit.
794  * \retval -EHOSTUNREACH If the next hop of the message appears dead.
795  * \retval -ECANCELED If the MD of the message has been unlinked.
796  */
797 static int
798 lnet_post_send_locked(lnet_msg_t *msg, int do_send)
799 {
800         lnet_peer_t *lp = msg->msg_txpeer;
801         lnet_ni_t *ni = lp->lp_ni;
802         int cpt = msg->msg_tx_cpt;
803         struct lnet_tx_queue *tq = ni->ni_tx_queues[cpt];
804
805         /* non-lnet_send() callers have checked before */
806         LASSERT(!do_send || msg->msg_tx_delayed);
807         LASSERT(!msg->msg_receiving);
808         LASSERT(msg->msg_tx_committed);
809
810         /* NB 'lp' is always the next hop */
811         if (!(msg->msg_target.pid & LNET_PID_USERFLAG) &&
812             !lnet_peer_alive_locked(lp)) {
813                 the_lnet.ln_counters[cpt]->drop_count++;
814                 the_lnet.ln_counters[cpt]->drop_length += msg->msg_len;
815                 lnet_net_unlock(cpt);
816
817                 CNETERR("Dropping message for %s: peer not alive\n",
818                         libcfs_id2str(msg->msg_target));
819                 if (do_send)
820                         lnet_finalize(ni, msg, -EHOSTUNREACH);
821
822                 lnet_net_lock(cpt);
823                 return -EHOSTUNREACH;
824         }
825
826         if (msg->msg_md &&
827             (msg->msg_md->md_flags & LNET_MD_FLAG_ABORTED)) {
828                 lnet_net_unlock(cpt);
829
830                 CNETERR("Aborting message for %s: LNetM[DE]Unlink() already called on the MD/ME.\n",
831                         libcfs_id2str(msg->msg_target));
832                 if (do_send)
833                         lnet_finalize(ni, msg, -ECANCELED);
834
835                 lnet_net_lock(cpt);
836                 return -ECANCELED;
837         }
838
839         if (!msg->msg_peertxcredit) {
840                 LASSERT((lp->lp_txcredits < 0) ==
841                         !list_empty(&lp->lp_txq));
842
843                 msg->msg_peertxcredit = 1;
844                 lp->lp_txqnob += msg->msg_len + sizeof(lnet_hdr_t);
845                 lp->lp_txcredits--;
846
847                 if (lp->lp_txcredits < lp->lp_mintxcredits)
848                         lp->lp_mintxcredits = lp->lp_txcredits;
849
850                 if (lp->lp_txcredits < 0) {
851                         msg->msg_tx_delayed = 1;
852                         list_add_tail(&msg->msg_list, &lp->lp_txq);
853                         return LNET_CREDIT_WAIT;
854                 }
855         }
856
857         if (!msg->msg_txcredit) {
858                 LASSERT((tq->tq_credits < 0) ==
859                         !list_empty(&tq->tq_delayed));
860
861                 msg->msg_txcredit = 1;
862                 tq->tq_credits--;
863
864                 if (tq->tq_credits < tq->tq_credits_min)
865                         tq->tq_credits_min = tq->tq_credits;
866
867                 if (tq->tq_credits < 0) {
868                         msg->msg_tx_delayed = 1;
869                         list_add_tail(&msg->msg_list, &tq->tq_delayed);
870                         return LNET_CREDIT_WAIT;
871                 }
872         }
873
874         if (do_send) {
875                 lnet_net_unlock(cpt);
876                 lnet_ni_send(ni, msg);
877                 lnet_net_lock(cpt);
878         }
879         return LNET_CREDIT_OK;
880 }
881
882 static lnet_rtrbufpool_t *
883 lnet_msg2bufpool(lnet_msg_t *msg)
884 {
885         lnet_rtrbufpool_t *rbp;
886         int cpt;
887
888         LASSERT(msg->msg_rx_committed);
889
890         cpt = msg->msg_rx_cpt;
891         rbp = &the_lnet.ln_rtrpools[cpt][0];
892
893         LASSERT(msg->msg_len <= LNET_MTU);
894         while (msg->msg_len > (unsigned int)rbp->rbp_npages * PAGE_CACHE_SIZE) {
895                 rbp++;
896                 LASSERT(rbp < &the_lnet.ln_rtrpools[cpt][LNET_NRBPOOLS]);
897         }
898
899         return rbp;
900 }
901
902 static int
903 lnet_post_routed_recv_locked(lnet_msg_t *msg, int do_recv)
904 {
905         /*
906          * lnet_parse is going to lnet_net_unlock immediately after this, so it
907          * sets do_recv FALSE and I don't do the unlock/send/lock bit.
908          * I return LNET_CREDIT_WAIT if msg blocked and LNET_CREDIT_OK if
909          * received or OK to receive
910          */
911         lnet_peer_t *lp = msg->msg_rxpeer;
912         lnet_rtrbufpool_t *rbp;
913         lnet_rtrbuf_t *rb;
914
915         LASSERT(!msg->msg_iov);
916         LASSERT(!msg->msg_kiov);
917         LASSERT(!msg->msg_niov);
918         LASSERT(msg->msg_routing);
919         LASSERT(msg->msg_receiving);
920         LASSERT(!msg->msg_sending);
921
922         /* non-lnet_parse callers only receive delayed messages */
923         LASSERT(!do_recv || msg->msg_rx_delayed);
924
925         if (!msg->msg_peerrtrcredit) {
926                 LASSERT((lp->lp_rtrcredits < 0) ==
927                         !list_empty(&lp->lp_rtrq));
928
929                 msg->msg_peerrtrcredit = 1;
930                 lp->lp_rtrcredits--;
931                 if (lp->lp_rtrcredits < lp->lp_minrtrcredits)
932                         lp->lp_minrtrcredits = lp->lp_rtrcredits;
933
934                 if (lp->lp_rtrcredits < 0) {
935                         /* must have checked eager_recv before here */
936                         LASSERT(msg->msg_rx_ready_delay);
937                         msg->msg_rx_delayed = 1;
938                         list_add_tail(&msg->msg_list, &lp->lp_rtrq);
939                         return LNET_CREDIT_WAIT;
940                 }
941         }
942
943         rbp = lnet_msg2bufpool(msg);
944
945         if (!msg->msg_rtrcredit) {
946                 msg->msg_rtrcredit = 1;
947                 rbp->rbp_credits--;
948                 if (rbp->rbp_credits < rbp->rbp_mincredits)
949                         rbp->rbp_mincredits = rbp->rbp_credits;
950
951                 if (rbp->rbp_credits < 0) {
952                         /* must have checked eager_recv before here */
953                         LASSERT(msg->msg_rx_ready_delay);
954                         msg->msg_rx_delayed = 1;
955                         list_add_tail(&msg->msg_list, &rbp->rbp_msgs);
956                         return LNET_CREDIT_WAIT;
957                 }
958         }
959
960         LASSERT(!list_empty(&rbp->rbp_bufs));
961         rb = list_entry(rbp->rbp_bufs.next, lnet_rtrbuf_t, rb_list);
962         list_del(&rb->rb_list);
963
964         msg->msg_niov = rbp->rbp_npages;
965         msg->msg_kiov = &rb->rb_kiov[0];
966
967         if (do_recv) {
968                 int cpt = msg->msg_rx_cpt;
969
970                 lnet_net_unlock(cpt);
971                 lnet_ni_recv(lp->lp_ni, msg->msg_private, msg, 1,
972                              0, msg->msg_len, msg->msg_len);
973                 lnet_net_lock(cpt);
974         }
975         return LNET_CREDIT_OK;
976 }
977
978 void
979 lnet_return_tx_credits_locked(lnet_msg_t *msg)
980 {
981         lnet_peer_t *txpeer = msg->msg_txpeer;
982         lnet_msg_t *msg2;
983
984         if (msg->msg_txcredit) {
985                 struct lnet_ni *ni = txpeer->lp_ni;
986                 struct lnet_tx_queue *tq = ni->ni_tx_queues[msg->msg_tx_cpt];
987
988                 /* give back NI txcredits */
989                 msg->msg_txcredit = 0;
990
991                 LASSERT((tq->tq_credits < 0) ==
992                         !list_empty(&tq->tq_delayed));
993
994                 tq->tq_credits++;
995                 if (tq->tq_credits <= 0) {
996                         msg2 = list_entry(tq->tq_delayed.next,
997                                           lnet_msg_t, msg_list);
998                         list_del(&msg2->msg_list);
999
1000                         LASSERT(msg2->msg_txpeer->lp_ni == ni);
1001                         LASSERT(msg2->msg_tx_delayed);
1002
1003                         (void) lnet_post_send_locked(msg2, 1);
1004                 }
1005         }
1006
1007         if (msg->msg_peertxcredit) {
1008                 /* give back peer txcredits */
1009                 msg->msg_peertxcredit = 0;
1010
1011                 LASSERT((txpeer->lp_txcredits < 0) ==
1012                         !list_empty(&txpeer->lp_txq));
1013
1014                 txpeer->lp_txqnob -= msg->msg_len + sizeof(lnet_hdr_t);
1015                 LASSERT(txpeer->lp_txqnob >= 0);
1016
1017                 txpeer->lp_txcredits++;
1018                 if (txpeer->lp_txcredits <= 0) {
1019                         msg2 = list_entry(txpeer->lp_txq.next,
1020                                           lnet_msg_t, msg_list);
1021                         list_del(&msg2->msg_list);
1022
1023                         LASSERT(msg2->msg_txpeer == txpeer);
1024                         LASSERT(msg2->msg_tx_delayed);
1025
1026                         (void) lnet_post_send_locked(msg2, 1);
1027                 }
1028         }
1029
1030         if (txpeer) {
1031                 msg->msg_txpeer = NULL;
1032                 lnet_peer_decref_locked(txpeer);
1033         }
1034 }
1035
1036 void
1037 lnet_schedule_blocked_locked(lnet_rtrbufpool_t *rbp)
1038 {
1039         lnet_msg_t *msg;
1040
1041         if (list_empty(&rbp->rbp_msgs))
1042                 return;
1043         msg = list_entry(rbp->rbp_msgs.next,
1044                          lnet_msg_t, msg_list);
1045         list_del(&msg->msg_list);
1046
1047         (void)lnet_post_routed_recv_locked(msg, 1);
1048 }
1049
1050 void
1051 lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
1052 {
1053         struct list_head drop;
1054         lnet_msg_t *msg;
1055         lnet_msg_t *tmp;
1056
1057         INIT_LIST_HEAD(&drop);
1058
1059         list_splice_init(list, &drop);
1060
1061         lnet_net_unlock(cpt);
1062
1063         list_for_each_entry_safe(msg, tmp, &drop, msg_list) {
1064                 lnet_ni_recv(msg->msg_rxpeer->lp_ni, msg->msg_private, NULL,
1065                              0, 0, 0, msg->msg_hdr.payload_length);
1066                 list_del_init(&msg->msg_list);
1067                 lnet_finalize(NULL, msg, -ECANCELED);
1068         }
1069
1070         lnet_net_lock(cpt);
1071 }
1072
1073 void
1074 lnet_return_rx_credits_locked(lnet_msg_t *msg)
1075 {
1076         lnet_peer_t *rxpeer = msg->msg_rxpeer;
1077         lnet_msg_t *msg2;
1078
1079         if (msg->msg_rtrcredit) {
1080                 /* give back global router credits */
1081                 lnet_rtrbuf_t *rb;
1082                 lnet_rtrbufpool_t *rbp;
1083
1084                 /*
1085                  * NB If a msg ever blocks for a buffer in rbp_msgs, it stays
1086                  * there until it gets one allocated, or aborts the wait
1087                  * itself
1088                  */
1089                 LASSERT(msg->msg_kiov);
1090
1091                 rb = list_entry(msg->msg_kiov, lnet_rtrbuf_t, rb_kiov[0]);
1092                 rbp = rb->rb_pool;
1093
1094                 msg->msg_kiov = NULL;
1095                 msg->msg_rtrcredit = 0;
1096
1097                 LASSERT(rbp == lnet_msg2bufpool(msg));
1098
1099                 LASSERT((rbp->rbp_credits > 0) ==
1100                         !list_empty(&rbp->rbp_bufs));
1101
1102                 /*
1103                  * If routing is now turned off, we just drop this buffer and
1104                  * don't bother trying to return credits.
1105                  */
1106                 if (!the_lnet.ln_routing) {
1107                         lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
1108                         goto routing_off;
1109                 }
1110
1111                 /*
1112                  * It is possible that a user has lowered the desired number of
1113                  * buffers in this pool.  Make sure we never put back
1114                  * more buffers than the stated number.
1115                  */
1116                 if (rbp->rbp_credits >= rbp->rbp_nbuffers) {
1117                         /* Discard this buffer so we don't have too many. */
1118                         lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
1119                 } else {
1120                         list_add(&rb->rb_list, &rbp->rbp_bufs);
1121                         rbp->rbp_credits++;
1122                         if (rbp->rbp_credits <= 0)
1123                                 lnet_schedule_blocked_locked(rbp);
1124                 }
1125         }
1126
1127 routing_off:
1128         if (msg->msg_peerrtrcredit) {
1129                 /* give back peer router credits */
1130                 msg->msg_peerrtrcredit = 0;
1131
1132                 LASSERT((rxpeer->lp_rtrcredits < 0) ==
1133                         !list_empty(&rxpeer->lp_rtrq));
1134
1135                 rxpeer->lp_rtrcredits++;
1136                 /*
1137                  * drop all messages which are queued to be routed on that
1138                  * peer.
1139                  */
1140                 if (!the_lnet.ln_routing) {
1141                         lnet_drop_routed_msgs_locked(&rxpeer->lp_rtrq,
1142                                                      msg->msg_rx_cpt);
1143                 } else if (rxpeer->lp_rtrcredits <= 0) {
1144                         msg2 = list_entry(rxpeer->lp_rtrq.next,
1145                                           lnet_msg_t, msg_list);
1146                         list_del(&msg2->msg_list);
1147
1148                         (void) lnet_post_routed_recv_locked(msg2, 1);
1149                 }
1150         }
1151         if (rxpeer) {
1152                 msg->msg_rxpeer = NULL;
1153                 lnet_peer_decref_locked(rxpeer);
1154         }
1155 }
1156
1157 static int
1158 lnet_compare_routes(lnet_route_t *r1, lnet_route_t *r2)
1159 {
1160         lnet_peer_t *p1 = r1->lr_gateway;
1161         lnet_peer_t *p2 = r2->lr_gateway;
1162
1163         if (r1->lr_priority < r2->lr_priority)
1164                 return 1;
1165
1166         if (r1->lr_priority > r2->lr_priority)
1167                 return -1;
1168
1169         if (r1->lr_hops < r2->lr_hops)
1170                 return 1;
1171
1172         if (r1->lr_hops > r2->lr_hops)
1173                 return -1;
1174
1175         if (p1->lp_txqnob < p2->lp_txqnob)
1176                 return 1;
1177
1178         if (p1->lp_txqnob > p2->lp_txqnob)
1179                 return -1;
1180
1181         if (p1->lp_txcredits > p2->lp_txcredits)
1182                 return 1;
1183
1184         if (p1->lp_txcredits < p2->lp_txcredits)
1185                 return -1;
1186
1187         if (r1->lr_seq - r2->lr_seq <= 0)
1188                 return 1;
1189
1190         return -1;
1191 }
1192
1193 static lnet_peer_t *
1194 lnet_find_route_locked(lnet_ni_t *ni, lnet_nid_t target, lnet_nid_t rtr_nid)
1195 {
1196         lnet_remotenet_t *rnet;
1197         lnet_route_t *route;
1198         lnet_route_t *best_route;
1199         lnet_route_t *last_route;
1200         struct lnet_peer *lp_best;
1201         struct lnet_peer *lp;
1202         int rc;
1203
1204         /*
1205          * If @rtr_nid is not LNET_NID_ANY, return the gateway with
1206          * rtr_nid nid, otherwise find the best gateway I can use
1207          */
1208         rnet = lnet_find_net_locked(LNET_NIDNET(target));
1209         if (!rnet)
1210                 return NULL;
1211
1212         lp_best = NULL;
1213         best_route = NULL;
1214         last_route = NULL;
1215         list_for_each_entry(route, &rnet->lrn_routes, lr_list) {
1216                 lp = route->lr_gateway;
1217
1218                 if (!lnet_is_route_alive(route))
1219                         continue;
1220
1221                 if (ni && lp->lp_ni != ni)
1222                         continue;
1223
1224                 if (lp->lp_nid == rtr_nid) /* it's pre-determined router */
1225                         return lp;
1226
1227                 if (!lp_best) {
1228                         best_route = route;
1229                         last_route = route;
1230                         lp_best = lp;
1231                         continue;
1232                 }
1233
1234                 /* no protection on below fields, but it's harmless */
1235                 if (last_route->lr_seq - route->lr_seq < 0)
1236                         last_route = route;
1237
1238                 rc = lnet_compare_routes(route, best_route);
1239                 if (rc < 0)
1240                         continue;
1241
1242                 best_route = route;
1243                 lp_best = lp;
1244         }
1245
1246         /*
1247          * set sequence number on the best router to the latest sequence + 1
1248          * so we can round-robin all routers, it's race and inaccurate but
1249          * harmless and functional
1250          */
1251         if (best_route)
1252                 best_route->lr_seq = last_route->lr_seq + 1;
1253         return lp_best;
1254 }
1255
1256 int
1257 lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg, lnet_nid_t rtr_nid)
1258 {
1259         lnet_nid_t dst_nid = msg->msg_target.nid;
1260         struct lnet_ni *src_ni;
1261         struct lnet_ni *local_ni;
1262         struct lnet_peer *lp;
1263         int cpt;
1264         int cpt2;
1265         int rc;
1266
1267         /*
1268          * NB: rtr_nid is set to LNET_NID_ANY for all current use-cases,
1269          * but we might want to use pre-determined router for ACK/REPLY
1270          * in the future
1271          */
1272         /* NB: ni == interface pre-determined (ACK/REPLY) */
1273         LASSERT(!msg->msg_txpeer);
1274         LASSERT(!msg->msg_sending);
1275         LASSERT(!msg->msg_target_is_router);
1276         LASSERT(!msg->msg_receiving);
1277
1278         msg->msg_sending = 1;
1279
1280         LASSERT(!msg->msg_tx_committed);
1281         cpt = lnet_cpt_of_nid(rtr_nid == LNET_NID_ANY ? dst_nid : rtr_nid);
1282  again:
1283         lnet_net_lock(cpt);
1284
1285         if (the_lnet.ln_shutdown) {
1286                 lnet_net_unlock(cpt);
1287                 return -ESHUTDOWN;
1288         }
1289
1290         if (src_nid == LNET_NID_ANY) {
1291                 src_ni = NULL;
1292         } else {
1293                 src_ni = lnet_nid2ni_locked(src_nid, cpt);
1294                 if (!src_ni) {
1295                         lnet_net_unlock(cpt);
1296                         LCONSOLE_WARN("Can't send to %s: src %s is not a local nid\n",
1297                                       libcfs_nid2str(dst_nid),
1298                                       libcfs_nid2str(src_nid));
1299                         return -EINVAL;
1300                 }
1301                 LASSERT(!msg->msg_routing);
1302         }
1303
1304         /* Is this for someone on a local network? */
1305         local_ni = lnet_net2ni_locked(LNET_NIDNET(dst_nid), cpt);
1306
1307         if (local_ni) {
1308                 if (!src_ni) {
1309                         src_ni = local_ni;
1310                         src_nid = src_ni->ni_nid;
1311                 } else if (src_ni == local_ni) {
1312                         lnet_ni_decref_locked(local_ni, cpt);
1313                 } else {
1314                         lnet_ni_decref_locked(local_ni, cpt);
1315                         lnet_ni_decref_locked(src_ni, cpt);
1316                         lnet_net_unlock(cpt);
1317                         LCONSOLE_WARN("No route to %s via from %s\n",
1318                                       libcfs_nid2str(dst_nid),
1319                                       libcfs_nid2str(src_nid));
1320                         return -EINVAL;
1321                 }
1322
1323                 LASSERT(src_nid != LNET_NID_ANY);
1324                 lnet_msg_commit(msg, cpt);
1325
1326                 if (!msg->msg_routing)
1327                         msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1328
1329                 if (src_ni == the_lnet.ln_loni) {
1330                         /* No send credit hassles with LOLND */
1331                         lnet_net_unlock(cpt);
1332                         lnet_ni_send(src_ni, msg);
1333
1334                         lnet_net_lock(cpt);
1335                         lnet_ni_decref_locked(src_ni, cpt);
1336                         lnet_net_unlock(cpt);
1337                         return 0;
1338                 }
1339
1340                 rc = lnet_nid2peer_locked(&lp, dst_nid, cpt);
1341                 /* lp has ref on src_ni; lose mine */
1342                 lnet_ni_decref_locked(src_ni, cpt);
1343                 if (rc) {
1344                         lnet_net_unlock(cpt);
1345                         LCONSOLE_WARN("Error %d finding peer %s\n", rc,
1346                                       libcfs_nid2str(dst_nid));
1347                         /* ENOMEM or shutting down */
1348                         return rc;
1349                 }
1350                 LASSERT(lp->lp_ni == src_ni);
1351         } else {
1352                 /* sending to a remote network */
1353                 lp = lnet_find_route_locked(src_ni, dst_nid, rtr_nid);
1354                 if (!lp) {
1355                         if (src_ni)
1356                                 lnet_ni_decref_locked(src_ni, cpt);
1357                         lnet_net_unlock(cpt);
1358
1359                         LCONSOLE_WARN("No route to %s via %s (all routers down)\n",
1360                                       libcfs_id2str(msg->msg_target),
1361                                       libcfs_nid2str(src_nid));
1362                         return -EHOSTUNREACH;
1363                 }
1364
1365                 /*
1366                  * rtr_nid is LNET_NID_ANY or NID of pre-determined router,
1367                  * it's possible that rtr_nid isn't LNET_NID_ANY and lp isn't
1368                  * pre-determined router, this can happen if router table
1369                  * was changed when we release the lock
1370                  */
1371                 if (rtr_nid != lp->lp_nid) {
1372                         cpt2 = lnet_cpt_of_nid_locked(lp->lp_nid);
1373                         if (cpt2 != cpt) {
1374                                 if (src_ni)
1375                                         lnet_ni_decref_locked(src_ni, cpt);
1376                                 lnet_net_unlock(cpt);
1377
1378                                 rtr_nid = lp->lp_nid;
1379                                 cpt = cpt2;
1380                                 goto again;
1381                         }
1382                 }
1383
1384                 CDEBUG(D_NET, "Best route to %s via %s for %s %d\n",
1385                        libcfs_nid2str(dst_nid), libcfs_nid2str(lp->lp_nid),
1386                        lnet_msgtyp2str(msg->msg_type), msg->msg_len);
1387
1388                 if (!src_ni) {
1389                         src_ni = lp->lp_ni;
1390                         src_nid = src_ni->ni_nid;
1391                 } else {
1392                         LASSERT(src_ni == lp->lp_ni);
1393                         lnet_ni_decref_locked(src_ni, cpt);
1394                 }
1395
1396                 lnet_peer_addref_locked(lp);
1397
1398                 LASSERT(src_nid != LNET_NID_ANY);
1399                 lnet_msg_commit(msg, cpt);
1400
1401                 if (!msg->msg_routing) {
1402                         /* I'm the source and now I know which NI to send on */
1403                         msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1404                 }
1405
1406                 msg->msg_target_is_router = 1;
1407                 msg->msg_target.nid = lp->lp_nid;
1408                 msg->msg_target.pid = LNET_PID_LUSTRE;
1409         }
1410
1411         /* 'lp' is our best choice of peer */
1412
1413         LASSERT(!msg->msg_peertxcredit);
1414         LASSERT(!msg->msg_txcredit);
1415         LASSERT(!msg->msg_txpeer);
1416
1417         msg->msg_txpeer = lp;              /* msg takes my ref on lp */
1418
1419         rc = lnet_post_send_locked(msg, 0);
1420         lnet_net_unlock(cpt);
1421
1422         if (rc < 0)
1423                 return rc;
1424
1425         if (rc == LNET_CREDIT_OK)
1426                 lnet_ni_send(src_ni, msg);
1427
1428         return 0; /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT */
1429 }
1430
1431 static void
1432 lnet_drop_message(lnet_ni_t *ni, int cpt, void *private, unsigned int nob)
1433 {
1434         lnet_net_lock(cpt);
1435         the_lnet.ln_counters[cpt]->drop_count++;
1436         the_lnet.ln_counters[cpt]->drop_length += nob;
1437         lnet_net_unlock(cpt);
1438
1439         lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
1440 }
1441
1442 static void
1443 lnet_recv_put(lnet_ni_t *ni, lnet_msg_t *msg)
1444 {
1445         lnet_hdr_t *hdr = &msg->msg_hdr;
1446
1447         if (msg->msg_wanted)
1448                 lnet_setpayloadbuffer(msg);
1449
1450         lnet_build_msg_event(msg, LNET_EVENT_PUT);
1451
1452         /*
1453          * Must I ACK?  If so I'll grab the ack_wmd out of the header and put
1454          * it back into the ACK during lnet_finalize()
1455          */
1456         msg->msg_ack = !lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
1457                        !(msg->msg_md->md_options & LNET_MD_ACK_DISABLE);
1458
1459         lnet_ni_recv(ni, msg->msg_private, msg, msg->msg_rx_delayed,
1460                      msg->msg_offset, msg->msg_wanted, hdr->payload_length);
1461 }
1462
1463 static int
1464 lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
1465 {
1466         lnet_hdr_t *hdr = &msg->msg_hdr;
1467         struct lnet_match_info info;
1468         int rc;
1469
1470         /* Convert put fields to host byte order */
1471         hdr->msg.put.match_bits = le64_to_cpu(hdr->msg.put.match_bits);
1472         hdr->msg.put.ptl_index  = le32_to_cpu(hdr->msg.put.ptl_index);
1473         hdr->msg.put.offset     = le32_to_cpu(hdr->msg.put.offset);
1474
1475         info.mi_id.nid  = hdr->src_nid;
1476         info.mi_id.pid  = hdr->src_pid;
1477         info.mi_opc     = LNET_MD_OP_PUT;
1478         info.mi_portal  = hdr->msg.put.ptl_index;
1479         info.mi_rlength = hdr->payload_length;
1480         info.mi_roffset = hdr->msg.put.offset;
1481         info.mi_mbits   = hdr->msg.put.match_bits;
1482
1483         msg->msg_rx_ready_delay = !ni->ni_lnd->lnd_eager_recv;
1484
1485  again:
1486         rc = lnet_ptl_match_md(&info, msg);
1487         switch (rc) {
1488         default:
1489                 LBUG();
1490
1491         case LNET_MATCHMD_OK:
1492                 lnet_recv_put(ni, msg);
1493                 return 0;
1494
1495         case LNET_MATCHMD_NONE:
1496                 if (msg->msg_rx_delayed) /* attached on delayed list */
1497                         return 0;
1498
1499                 rc = lnet_ni_eager_recv(ni, msg);
1500                 if (!rc)
1501                         goto again;
1502                 /* fall through */
1503
1504         case LNET_MATCHMD_DROP:
1505                 CNETERR("Dropping PUT from %s portal %d match %llu offset %d length %d: %d\n",
1506                         libcfs_id2str(info.mi_id), info.mi_portal,
1507                         info.mi_mbits, info.mi_roffset, info.mi_rlength, rc);
1508
1509                 return ENOENT;  /* +ve: OK but no match */
1510         }
1511 }
1512
1513 static int
1514 lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
1515 {
1516         struct lnet_match_info info;
1517         lnet_hdr_t *hdr = &msg->msg_hdr;
1518         lnet_handle_wire_t reply_wmd;
1519         int rc;
1520
1521         /* Convert get fields to host byte order */
1522         hdr->msg.get.match_bits  = le64_to_cpu(hdr->msg.get.match_bits);
1523         hdr->msg.get.ptl_index   = le32_to_cpu(hdr->msg.get.ptl_index);
1524         hdr->msg.get.sink_length = le32_to_cpu(hdr->msg.get.sink_length);
1525         hdr->msg.get.src_offset  = le32_to_cpu(hdr->msg.get.src_offset);
1526
1527         info.mi_id.nid  = hdr->src_nid;
1528         info.mi_id.pid  = hdr->src_pid;
1529         info.mi_opc     = LNET_MD_OP_GET;
1530         info.mi_portal  = hdr->msg.get.ptl_index;
1531         info.mi_rlength = hdr->msg.get.sink_length;
1532         info.mi_roffset = hdr->msg.get.src_offset;
1533         info.mi_mbits   = hdr->msg.get.match_bits;
1534
1535         rc = lnet_ptl_match_md(&info, msg);
1536         if (rc == LNET_MATCHMD_DROP) {
1537                 CNETERR("Dropping GET from %s portal %d match %llu offset %d length %d\n",
1538                         libcfs_id2str(info.mi_id), info.mi_portal,
1539                         info.mi_mbits, info.mi_roffset, info.mi_rlength);
1540                 return ENOENT;  /* +ve: OK but no match */
1541         }
1542
1543         LASSERT(rc == LNET_MATCHMD_OK);
1544
1545         lnet_build_msg_event(msg, LNET_EVENT_GET);
1546
1547         reply_wmd = hdr->msg.get.return_wmd;
1548
1549         lnet_prep_send(msg, LNET_MSG_REPLY, info.mi_id,
1550                        msg->msg_offset, msg->msg_wanted);
1551
1552         msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
1553
1554         if (rdma_get) {
1555                 /* The LND completes the REPLY from her recv procedure */
1556                 lnet_ni_recv(ni, msg->msg_private, msg, 0,
1557                              msg->msg_offset, msg->msg_len, msg->msg_len);
1558                 return 0;
1559         }
1560
1561         lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
1562         msg->msg_receiving = 0;
1563
1564         rc = lnet_send(ni->ni_nid, msg, LNET_NID_ANY);
1565         if (rc < 0) {
1566                 /* didn't get as far as lnet_ni_send() */
1567                 CERROR("%s: Unable to send REPLY for GET from %s: %d\n",
1568                        libcfs_nid2str(ni->ni_nid),
1569                        libcfs_id2str(info.mi_id), rc);
1570
1571                 lnet_finalize(ni, msg, rc);
1572         }
1573
1574         return 0;
1575 }
1576
1577 static int
1578 lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
1579 {
1580         void *private = msg->msg_private;
1581         lnet_hdr_t *hdr = &msg->msg_hdr;
1582         lnet_process_id_t src = {0};
1583         lnet_libmd_t *md;
1584         int rlength;
1585         int mlength;
1586         int cpt;
1587
1588         cpt = lnet_cpt_of_cookie(hdr->msg.reply.dst_wmd.wh_object_cookie);
1589         lnet_res_lock(cpt);
1590
1591         src.nid = hdr->src_nid;
1592         src.pid = hdr->src_pid;
1593
1594         /* NB handles only looked up by creator (no flips) */
1595         md = lnet_wire_handle2md(&hdr->msg.reply.dst_wmd);
1596         if (!md || !md->md_threshold || md->md_me) {
1597                 CNETERR("%s: Dropping REPLY from %s for %s MD %#llx.%#llx\n",
1598                         libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1599                         !md ? "invalid" : "inactive",
1600                         hdr->msg.reply.dst_wmd.wh_interface_cookie,
1601                         hdr->msg.reply.dst_wmd.wh_object_cookie);
1602                 if (md && md->md_me)
1603                         CERROR("REPLY MD also attached to portal %d\n",
1604                                md->md_me->me_portal);
1605
1606                 lnet_res_unlock(cpt);
1607                 return ENOENT;            /* +ve: OK but no match */
1608         }
1609
1610         LASSERT(!md->md_offset);
1611
1612         rlength = hdr->payload_length;
1613         mlength = min_t(uint, rlength, md->md_length);
1614
1615         if (mlength < rlength &&
1616             !(md->md_options & LNET_MD_TRUNCATE)) {
1617                 CNETERR("%s: Dropping REPLY from %s length %d for MD %#llx would overflow (%d)\n",
1618                         libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1619                         rlength, hdr->msg.reply.dst_wmd.wh_object_cookie,
1620                         mlength);
1621                 lnet_res_unlock(cpt);
1622                 return ENOENT;    /* +ve: OK but no match */
1623         }
1624
1625         CDEBUG(D_NET, "%s: Reply from %s of length %d/%d into md %#llx\n",
1626                libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1627                mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
1628
1629         lnet_msg_attach_md(msg, md, 0, mlength);
1630
1631         if (mlength)
1632                 lnet_setpayloadbuffer(msg);
1633
1634         lnet_res_unlock(cpt);
1635
1636         lnet_build_msg_event(msg, LNET_EVENT_REPLY);
1637
1638         lnet_ni_recv(ni, private, msg, 0, 0, mlength, rlength);
1639         return 0;
1640 }
1641
1642 static int
1643 lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
1644 {
1645         lnet_hdr_t *hdr = &msg->msg_hdr;
1646         lnet_process_id_t src = {0};
1647         lnet_libmd_t *md;
1648         int cpt;
1649
1650         src.nid = hdr->src_nid;
1651         src.pid = hdr->src_pid;
1652
1653         /* Convert ack fields to host byte order */
1654         hdr->msg.ack.match_bits = le64_to_cpu(hdr->msg.ack.match_bits);
1655         hdr->msg.ack.mlength = le32_to_cpu(hdr->msg.ack.mlength);
1656
1657         cpt = lnet_cpt_of_cookie(hdr->msg.ack.dst_wmd.wh_object_cookie);
1658         lnet_res_lock(cpt);
1659
1660         /* NB handles only looked up by creator (no flips) */
1661         md = lnet_wire_handle2md(&hdr->msg.ack.dst_wmd);
1662         if (!md || !md->md_threshold || md->md_me) {
1663                 /* Don't moan; this is expected */
1664                 CDEBUG(D_NET,
1665                        "%s: Dropping ACK from %s to %s MD %#llx.%#llx\n",
1666                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1667                        !md ? "invalid" : "inactive",
1668                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
1669                        hdr->msg.ack.dst_wmd.wh_object_cookie);
1670                 if (md && md->md_me)
1671                         CERROR("Source MD also attached to portal %d\n",
1672                                md->md_me->me_portal);
1673
1674                 lnet_res_unlock(cpt);
1675                 return ENOENT;            /* +ve! */
1676         }
1677
1678         CDEBUG(D_NET, "%s: ACK from %s into md %#llx\n",
1679                libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1680                hdr->msg.ack.dst_wmd.wh_object_cookie);
1681
1682         lnet_msg_attach_md(msg, md, 0, 0);
1683
1684         lnet_res_unlock(cpt);
1685
1686         lnet_build_msg_event(msg, LNET_EVENT_ACK);
1687
1688         lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
1689         return 0;
1690 }
1691
1692 /**
1693  * \retval LNET_CREDIT_OK       If \a msg is forwarded
1694  * \retval LNET_CREDIT_WAIT     If \a msg is blocked because w/o buffer
1695  * \retval -ve                  error code
1696  */
1697 static int
1698 lnet_parse_forward_locked(lnet_ni_t *ni, lnet_msg_t *msg)
1699 {
1700         int rc = 0;
1701
1702         if (!the_lnet.ln_routing)
1703                 return -ECANCELED;
1704
1705         if (msg->msg_rxpeer->lp_rtrcredits <= 0 ||
1706             lnet_msg2bufpool(msg)->rbp_credits <= 0) {
1707                 if (!ni->ni_lnd->lnd_eager_recv) {
1708                         msg->msg_rx_ready_delay = 1;
1709                 } else {
1710                         lnet_net_unlock(msg->msg_rx_cpt);
1711                         rc = lnet_ni_eager_recv(ni, msg);
1712                         lnet_net_lock(msg->msg_rx_cpt);
1713                 }
1714         }
1715
1716         if (!rc)
1717                 rc = lnet_post_routed_recv_locked(msg, 0);
1718         return rc;
1719 }
1720
1721 char *
1722 lnet_msgtyp2str(int type)
1723 {
1724         switch (type) {
1725         case LNET_MSG_ACK:
1726                 return "ACK";
1727         case LNET_MSG_PUT:
1728                 return "PUT";
1729         case LNET_MSG_GET:
1730                 return "GET";
1731         case LNET_MSG_REPLY:
1732                 return "REPLY";
1733         case LNET_MSG_HELLO:
1734                 return "HELLO";
1735         default:
1736                 return "<UNKNOWN>";
1737         }
1738 }
1739
1740 void
1741 lnet_print_hdr(lnet_hdr_t *hdr)
1742 {
1743         lnet_process_id_t src = {0};
1744         lnet_process_id_t dst = {0};
1745         char *type_str = lnet_msgtyp2str(hdr->type);
1746
1747         src.nid = hdr->src_nid;
1748         src.pid = hdr->src_pid;
1749
1750         dst.nid = hdr->dest_nid;
1751         dst.pid = hdr->dest_pid;
1752
1753         CWARN("P3 Header at %p of type %s\n", hdr, type_str);
1754         CWARN("    From %s\n", libcfs_id2str(src));
1755         CWARN("    To   %s\n", libcfs_id2str(dst));
1756
1757         switch (hdr->type) {
1758         default:
1759                 break;
1760
1761         case LNET_MSG_PUT:
1762                 CWARN("    Ptl index %d, ack md %#llx.%#llx, match bits %llu\n",
1763                       hdr->msg.put.ptl_index,
1764                       hdr->msg.put.ack_wmd.wh_interface_cookie,
1765                       hdr->msg.put.ack_wmd.wh_object_cookie,
1766                       hdr->msg.put.match_bits);
1767                 CWARN("    Length %d, offset %d, hdr data %#llx\n",
1768                       hdr->payload_length, hdr->msg.put.offset,
1769                       hdr->msg.put.hdr_data);
1770                 break;
1771
1772         case LNET_MSG_GET:
1773                 CWARN("    Ptl index %d, return md %#llx.%#llx, match bits %llu\n",
1774                       hdr->msg.get.ptl_index,
1775                       hdr->msg.get.return_wmd.wh_interface_cookie,
1776                       hdr->msg.get.return_wmd.wh_object_cookie,
1777                       hdr->msg.get.match_bits);
1778                 CWARN("    Length %d, src offset %d\n",
1779                       hdr->msg.get.sink_length,
1780                       hdr->msg.get.src_offset);
1781                 break;
1782
1783         case LNET_MSG_ACK:
1784                 CWARN("    dst md %#llx.%#llx, manipulated length %d\n",
1785                       hdr->msg.ack.dst_wmd.wh_interface_cookie,
1786                       hdr->msg.ack.dst_wmd.wh_object_cookie,
1787                       hdr->msg.ack.mlength);
1788                 break;
1789
1790         case LNET_MSG_REPLY:
1791                 CWARN("    dst md %#llx.%#llx, length %d\n",
1792                       hdr->msg.reply.dst_wmd.wh_interface_cookie,
1793                       hdr->msg.reply.dst_wmd.wh_object_cookie,
1794                       hdr->payload_length);
1795         }
1796 }
1797
1798 int
1799 lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
1800            void *private, int rdma_req)
1801 {
1802         int rc = 0;
1803         int cpt;
1804         int for_me;
1805         struct lnet_msg *msg;
1806         lnet_pid_t dest_pid;
1807         lnet_nid_t dest_nid;
1808         lnet_nid_t src_nid;
1809         __u32 payload_length;
1810         __u32 type;
1811
1812         LASSERT(!in_interrupt());
1813
1814         type = le32_to_cpu(hdr->type);
1815         src_nid = le64_to_cpu(hdr->src_nid);
1816         dest_nid = le64_to_cpu(hdr->dest_nid);
1817         dest_pid = le32_to_cpu(hdr->dest_pid);
1818         payload_length = le32_to_cpu(hdr->payload_length);
1819
1820         for_me = (ni->ni_nid == dest_nid);
1821         cpt = lnet_cpt_of_nid(from_nid);
1822
1823         switch (type) {
1824         case LNET_MSG_ACK:
1825         case LNET_MSG_GET:
1826                 if (payload_length > 0) {
1827                         CERROR("%s, src %s: bad %s payload %d (0 expected)\n",
1828                                libcfs_nid2str(from_nid),
1829                                libcfs_nid2str(src_nid),
1830                                lnet_msgtyp2str(type), payload_length);
1831                         return -EPROTO;
1832                 }
1833                 break;
1834
1835         case LNET_MSG_PUT:
1836         case LNET_MSG_REPLY:
1837                 if (payload_length >
1838                    (__u32)(for_me ? LNET_MAX_PAYLOAD : LNET_MTU)) {
1839                         CERROR("%s, src %s: bad %s payload %d (%d max expected)\n",
1840                                libcfs_nid2str(from_nid),
1841                                libcfs_nid2str(src_nid),
1842                                lnet_msgtyp2str(type),
1843                                payload_length,
1844                                for_me ? LNET_MAX_PAYLOAD : LNET_MTU);
1845                         return -EPROTO;
1846                 }
1847                 break;
1848
1849         default:
1850                 CERROR("%s, src %s: Bad message type 0x%x\n",
1851                        libcfs_nid2str(from_nid),
1852                        libcfs_nid2str(src_nid), type);
1853                 return -EPROTO;
1854         }
1855
1856         if (the_lnet.ln_routing &&
1857             ni->ni_last_alive != ktime_get_real_seconds()) {
1858                 /* NB: so far here is the only place to set NI status to "up */
1859                 lnet_ni_lock(ni);
1860                 ni->ni_last_alive = ktime_get_real_seconds();
1861                 if (ni->ni_status &&
1862                     ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
1863                         ni->ni_status->ns_status = LNET_NI_STATUS_UP;
1864                 lnet_ni_unlock(ni);
1865         }
1866
1867         /*
1868          * Regard a bad destination NID as a protocol error.  Senders should
1869          * know what they're doing; if they don't they're misconfigured, buggy
1870          * or malicious so we chop them off at the knees :)
1871          */
1872         if (!for_me) {
1873                 if (LNET_NIDNET(dest_nid) == LNET_NIDNET(ni->ni_nid)) {
1874                         /* should have gone direct */
1875                         CERROR("%s, src %s: Bad dest nid %s (should have been sent direct)\n",
1876                                libcfs_nid2str(from_nid),
1877                                libcfs_nid2str(src_nid),
1878                                libcfs_nid2str(dest_nid));
1879                         return -EPROTO;
1880                 }
1881
1882                 if (lnet_islocalnid(dest_nid)) {
1883                         /*
1884                          * dest is another local NI; sender should have used
1885                          * this node's NID on its own network
1886                          */
1887                         CERROR("%s, src %s: Bad dest nid %s (it's my nid but on a different network)\n",
1888                                libcfs_nid2str(from_nid),
1889                                libcfs_nid2str(src_nid),
1890                                libcfs_nid2str(dest_nid));
1891                         return -EPROTO;
1892                 }
1893
1894                 if (rdma_req && type == LNET_MSG_GET) {
1895                         CERROR("%s, src %s: Bad optimized GET for %s (final destination must be me)\n",
1896                                libcfs_nid2str(from_nid),
1897                                libcfs_nid2str(src_nid),
1898                                libcfs_nid2str(dest_nid));
1899                         return -EPROTO;
1900                 }
1901
1902                 if (!the_lnet.ln_routing) {
1903                         CERROR("%s, src %s: Dropping message for %s (routing not enabled)\n",
1904                                libcfs_nid2str(from_nid),
1905                                libcfs_nid2str(src_nid),
1906                                libcfs_nid2str(dest_nid));
1907                         goto drop;
1908                 }
1909         }
1910
1911         /*
1912          * Message looks OK; we're not going to return an error, so we MUST
1913          * call back lnd_recv() come what may...
1914          */
1915         if (!list_empty(&the_lnet.ln_test_peers) && /* normally we don't */
1916             fail_peer(src_nid, 0)) {         /* shall we now? */
1917                 CERROR("%s, src %s: Dropping %s to simulate failure\n",
1918                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1919                        lnet_msgtyp2str(type));
1920                 goto drop;
1921         }
1922
1923         msg = lnet_msg_alloc();
1924         if (!msg) {
1925                 CERROR("%s, src %s: Dropping %s (out of memory)\n",
1926                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1927                        lnet_msgtyp2str(type));
1928                 goto drop;
1929         }
1930
1931         /* msg zeroed in lnet_msg_alloc;
1932          * i.e. flags all clear, pointers NULL etc
1933          */
1934         msg->msg_type = type;
1935         msg->msg_private = private;
1936         msg->msg_receiving = 1;
1937         msg->msg_wanted = payload_length;
1938         msg->msg_len = payload_length;
1939         msg->msg_offset = 0;
1940         msg->msg_hdr = *hdr;
1941         /* for building message event */
1942         msg->msg_from = from_nid;
1943         if (!for_me) {
1944                 msg->msg_target.pid     = dest_pid;
1945                 msg->msg_target.nid     = dest_nid;
1946                 msg->msg_routing        = 1;
1947
1948         } else {
1949                 /* convert common msg->hdr fields to host byteorder */
1950                 msg->msg_hdr.type       = type;
1951                 msg->msg_hdr.src_nid    = src_nid;
1952                 msg->msg_hdr.src_pid    = le32_to_cpu(msg->msg_hdr.src_pid);
1953                 msg->msg_hdr.dest_nid   = dest_nid;
1954                 msg->msg_hdr.dest_pid   = dest_pid;
1955                 msg->msg_hdr.payload_length = payload_length;
1956         }
1957
1958         lnet_net_lock(cpt);
1959         rc = lnet_nid2peer_locked(&msg->msg_rxpeer, from_nid, cpt);
1960         if (rc) {
1961                 lnet_net_unlock(cpt);
1962                 CERROR("%s, src %s: Dropping %s (error %d looking up sender)\n",
1963                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1964                        lnet_msgtyp2str(type), rc);
1965                 lnet_msg_free(msg);
1966                 goto drop;
1967         }
1968
1969         if (lnet_isrouter(msg->msg_rxpeer)) {
1970                 lnet_peer_set_alive(msg->msg_rxpeer);
1971                 if (avoid_asym_router_failure &&
1972                     LNET_NIDNET(src_nid) != LNET_NIDNET(from_nid)) {
1973                         /* received a remote message from router, update
1974                          * remote NI status on this router.
1975                          * NB: multi-hop routed message will be ignored.
1976                          */
1977                         lnet_router_ni_update_locked(msg->msg_rxpeer,
1978                                                      LNET_NIDNET(src_nid));
1979                 }
1980         }
1981
1982         lnet_msg_commit(msg, cpt);
1983
1984         if (!for_me) {
1985                 rc = lnet_parse_forward_locked(ni, msg);
1986                 lnet_net_unlock(cpt);
1987
1988                 if (rc < 0)
1989                         goto free_drop;
1990
1991                 if (rc == LNET_CREDIT_OK) {
1992                         lnet_ni_recv(ni, msg->msg_private, msg, 0,
1993                                      0, payload_length, payload_length);
1994                 }
1995                 return 0;
1996         }
1997
1998         lnet_net_unlock(cpt);
1999
2000         switch (type) {
2001         case LNET_MSG_ACK:
2002                 rc = lnet_parse_ack(ni, msg);
2003                 break;
2004         case LNET_MSG_PUT:
2005                 rc = lnet_parse_put(ni, msg);
2006                 break;
2007         case LNET_MSG_GET:
2008                 rc = lnet_parse_get(ni, msg, rdma_req);
2009                 break;
2010         case LNET_MSG_REPLY:
2011                 rc = lnet_parse_reply(ni, msg);
2012                 break;
2013         default:
2014                 LASSERT(0);
2015                 rc = -EPROTO;
2016                 goto free_drop;  /* prevent an unused label if !kernel */
2017         }
2018
2019         if (!rc)
2020                 return 0;
2021
2022         LASSERT(rc == ENOENT);
2023
2024  free_drop:
2025         LASSERT(!msg->msg_md);
2026         lnet_finalize(ni, msg, rc);
2027
2028  drop:
2029         lnet_drop_message(ni, cpt, private, payload_length);
2030         return 0;
2031 }
2032 EXPORT_SYMBOL(lnet_parse);
2033
2034 void
2035 lnet_drop_delayed_msg_list(struct list_head *head, char *reason)
2036 {
2037         while (!list_empty(head)) {
2038                 lnet_process_id_t id = {0};
2039                 lnet_msg_t *msg;
2040
2041                 msg = list_entry(head->next, lnet_msg_t, msg_list);
2042                 list_del(&msg->msg_list);
2043
2044                 id.nid = msg->msg_hdr.src_nid;
2045                 id.pid = msg->msg_hdr.src_pid;
2046
2047                 LASSERT(!msg->msg_md);
2048                 LASSERT(msg->msg_rx_delayed);
2049                 LASSERT(msg->msg_rxpeer);
2050                 LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
2051
2052                 CWARN("Dropping delayed PUT from %s portal %d match %llu offset %d length %d: %s\n",
2053                       libcfs_id2str(id),
2054                       msg->msg_hdr.msg.put.ptl_index,
2055                       msg->msg_hdr.msg.put.match_bits,
2056                       msg->msg_hdr.msg.put.offset,
2057                       msg->msg_hdr.payload_length, reason);
2058
2059                 /*
2060                  * NB I can't drop msg's ref on msg_rxpeer until after I've
2061                  * called lnet_drop_message(), so I just hang onto msg as well
2062                  * until that's done
2063                  */
2064                 lnet_drop_message(msg->msg_rxpeer->lp_ni,
2065                                   msg->msg_rxpeer->lp_cpt,
2066                                   msg->msg_private, msg->msg_len);
2067                 /*
2068                  * NB: message will not generate event because w/o attached MD,
2069                  * but we still should give error code so lnet_msg_decommit()
2070                  * can skip counters operations and other checks.
2071                  */
2072                 lnet_finalize(msg->msg_rxpeer->lp_ni, msg, -ENOENT);
2073         }
2074 }
2075
2076 void
2077 lnet_recv_delayed_msg_list(struct list_head *head)
2078 {
2079         while (!list_empty(head)) {
2080                 lnet_msg_t *msg;
2081                 lnet_process_id_t id;
2082
2083                 msg = list_entry(head->next, lnet_msg_t, msg_list);
2084                 list_del(&msg->msg_list);
2085
2086                 /*
2087                  * md won't disappear under me, since each msg
2088                  * holds a ref on it
2089                  */
2090                 id.nid = msg->msg_hdr.src_nid;
2091                 id.pid = msg->msg_hdr.src_pid;
2092
2093                 LASSERT(msg->msg_rx_delayed);
2094                 LASSERT(msg->msg_md);
2095                 LASSERT(msg->msg_rxpeer);
2096                 LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
2097
2098                 CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d match %llu offset %d length %d.\n",
2099                        libcfs_id2str(id), msg->msg_hdr.msg.put.ptl_index,
2100                        msg->msg_hdr.msg.put.match_bits,
2101                        msg->msg_hdr.msg.put.offset,
2102                        msg->msg_hdr.payload_length);
2103
2104                 lnet_recv_put(msg->msg_rxpeer->lp_ni, msg);
2105         }
2106 }
2107
2108 /**
2109  * Initiate an asynchronous PUT operation.
2110  *
2111  * There are several events associated with a PUT: completion of the send on
2112  * the initiator node (LNET_EVENT_SEND), and when the send completes
2113  * successfully, the receipt of an acknowledgment (LNET_EVENT_ACK) indicating
2114  * that the operation was accepted by the target. The event LNET_EVENT_PUT is
2115  * used at the target node to indicate the completion of incoming data
2116  * delivery.
2117  *
2118  * The local events will be logged in the EQ associated with the MD pointed to
2119  * by \a mdh handle. Using a MD without an associated EQ results in these
2120  * events being discarded. In this case, the caller must have another
2121  * mechanism (e.g., a higher level protocol) for determining when it is safe
2122  * to modify the memory region associated with the MD.
2123  *
2124  * Note that LNet does not guarantee the order of LNET_EVENT_SEND and
2125  * LNET_EVENT_ACK, though intuitively ACK should happen after SEND.
2126  *
2127  * \param self Indicates the NID of a local interface through which to send
2128  * the PUT request. Use LNET_NID_ANY to let LNet choose one by itself.
2129  * \param mdh A handle for the MD that describes the memory to be sent. The MD
2130  * must be "free floating" (See LNetMDBind()).
2131  * \param ack Controls whether an acknowledgment is requested.
2132  * Acknowledgments are only sent when they are requested by the initiating
2133  * process and the target MD enables them.
2134  * \param target A process identifier for the target process.
2135  * \param portal The index in the \a target's portal table.
2136  * \param match_bits The match bits to use for MD selection at the target
2137  * process.
2138  * \param offset The offset into the target MD (only used when the target
2139  * MD has the LNET_MD_MANAGE_REMOTE option set).
2140  * \param hdr_data 64 bits of user data that can be included in the message
2141  * header. This data is written to an event queue entry at the target if an
2142  * EQ is present on the matching MD.
2143  *
2144  * \retval  0      Success, and only in this case events will be generated
2145  * and logged to EQ (if it exists).
2146  * \retval -EIO    Simulated failure.
2147  * \retval -ENOMEM Memory allocation failure.
2148  * \retval -ENOENT Invalid MD object.
2149  *
2150  * \see lnet_event_t::hdr_data and lnet_event_kind_t.
2151  */
2152 int
2153 LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
2154         lnet_process_id_t target, unsigned int portal,
2155         __u64 match_bits, unsigned int offset,
2156         __u64 hdr_data)
2157 {
2158         struct lnet_msg *msg;
2159         struct lnet_libmd *md;
2160         int cpt;
2161         int rc;
2162
2163         LASSERT(the_lnet.ln_refcount > 0);
2164
2165         if (!list_empty(&the_lnet.ln_test_peers) && /* normally we don't */
2166             fail_peer(target.nid, 1)) { /* shall we now? */
2167                 CERROR("Dropping PUT to %s: simulated failure\n",
2168                        libcfs_id2str(target));
2169                 return -EIO;
2170         }
2171
2172         msg = lnet_msg_alloc();
2173         if (!msg) {
2174                 CERROR("Dropping PUT to %s: ENOMEM on lnet_msg_t\n",
2175                        libcfs_id2str(target));
2176                 return -ENOMEM;
2177         }
2178         msg->msg_vmflush = !!memory_pressure_get();
2179
2180         cpt = lnet_cpt_of_cookie(mdh.cookie);
2181         lnet_res_lock(cpt);
2182
2183         md = lnet_handle2md(&mdh);
2184         if (!md || !md->md_threshold || md->md_me) {
2185                 CERROR("Dropping PUT (%llu:%d:%s): MD (%d) invalid\n",
2186                        match_bits, portal, libcfs_id2str(target),
2187                        !md ? -1 : md->md_threshold);
2188                 if (md && md->md_me)
2189                         CERROR("Source MD also attached to portal %d\n",
2190                                md->md_me->me_portal);
2191                 lnet_res_unlock(cpt);
2192
2193                 lnet_msg_free(msg);
2194                 return -ENOENT;
2195         }
2196
2197         CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target));
2198
2199         lnet_msg_attach_md(msg, md, 0, 0);
2200
2201         lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length);
2202
2203         msg->msg_hdr.msg.put.match_bits = cpu_to_le64(match_bits);
2204         msg->msg_hdr.msg.put.ptl_index = cpu_to_le32(portal);
2205         msg->msg_hdr.msg.put.offset = cpu_to_le32(offset);
2206         msg->msg_hdr.msg.put.hdr_data = hdr_data;
2207
2208         /* NB handles only looked up by creator (no flips) */
2209         if (ack == LNET_ACK_REQ) {
2210                 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
2211                         the_lnet.ln_interface_cookie;
2212                 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
2213                         md->md_lh.lh_cookie;
2214         } else {
2215                 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
2216                         LNET_WIRE_HANDLE_COOKIE_NONE;
2217                 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
2218                         LNET_WIRE_HANDLE_COOKIE_NONE;
2219         }
2220
2221         lnet_res_unlock(cpt);
2222
2223         lnet_build_msg_event(msg, LNET_EVENT_SEND);
2224
2225         rc = lnet_send(self, msg, LNET_NID_ANY);
2226         if (rc) {
2227                 CNETERR("Error sending PUT to %s: %d\n",
2228                         libcfs_id2str(target), rc);
2229                 lnet_finalize(NULL, msg, rc);
2230         }
2231
2232         /* completion will be signalled by an event */
2233         return 0;
2234 }
2235 EXPORT_SYMBOL(LNetPut);
2236
2237 lnet_msg_t *
2238 lnet_create_reply_msg(lnet_ni_t *ni, lnet_msg_t *getmsg)
2239 {
2240         /*
2241          * The LND can DMA direct to the GET md (i.e. no REPLY msg).  This
2242          * returns a msg for the LND to pass to lnet_finalize() when the sink
2243          * data has been received.
2244          *
2245          * CAVEAT EMPTOR: 'getmsg' is the original GET, which is freed when
2246          * lnet_finalize() is called on it, so the LND must call this first
2247          */
2248         struct lnet_msg *msg = lnet_msg_alloc();
2249         struct lnet_libmd *getmd = getmsg->msg_md;
2250         lnet_process_id_t peer_id = getmsg->msg_target;
2251         int cpt;
2252
2253         LASSERT(!getmsg->msg_target_is_router);
2254         LASSERT(!getmsg->msg_routing);
2255
2256         if (!msg) {
2257                 CERROR("%s: Dropping REPLY from %s: can't allocate msg\n",
2258                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id));
2259                 goto drop;
2260         }
2261
2262         cpt = lnet_cpt_of_cookie(getmd->md_lh.lh_cookie);
2263         lnet_res_lock(cpt);
2264
2265         LASSERT(getmd->md_refcount > 0);
2266
2267         if (!getmd->md_threshold) {
2268                 CERROR("%s: Dropping REPLY from %s for inactive MD %p\n",
2269                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id),
2270                        getmd);
2271                 lnet_res_unlock(cpt);
2272                 goto drop;
2273         }
2274
2275         LASSERT(!getmd->md_offset);
2276
2277         CDEBUG(D_NET, "%s: Reply from %s md %p\n",
2278                libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
2279
2280         /* setup information for lnet_build_msg_event */
2281         msg->msg_from = peer_id.nid;
2282         msg->msg_type = LNET_MSG_GET; /* flag this msg as an "optimized" GET */
2283         msg->msg_hdr.src_nid = peer_id.nid;
2284         msg->msg_hdr.payload_length = getmd->md_length;
2285         msg->msg_receiving = 1; /* required by lnet_msg_attach_md */
2286
2287         lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length);
2288         lnet_res_unlock(cpt);
2289
2290         cpt = lnet_cpt_of_nid(peer_id.nid);
2291
2292         lnet_net_lock(cpt);
2293         lnet_msg_commit(msg, cpt);
2294         lnet_net_unlock(cpt);
2295
2296         lnet_build_msg_event(msg, LNET_EVENT_REPLY);
2297
2298         return msg;
2299
2300  drop:
2301         cpt = lnet_cpt_of_nid(peer_id.nid);
2302
2303         lnet_net_lock(cpt);
2304         the_lnet.ln_counters[cpt]->drop_count++;
2305         the_lnet.ln_counters[cpt]->drop_length += getmd->md_length;
2306         lnet_net_unlock(cpt);
2307
2308         if (msg)
2309                 lnet_msg_free(msg);
2310
2311         return NULL;
2312 }
2313 EXPORT_SYMBOL(lnet_create_reply_msg);
2314
2315 void
2316 lnet_set_reply_msg_len(lnet_ni_t *ni, lnet_msg_t *reply, unsigned int len)
2317 {
2318         /*
2319          * Set the REPLY length, now the RDMA that elides the REPLY message has
2320          * completed and I know it.
2321          */
2322         LASSERT(reply);
2323         LASSERT(reply->msg_type == LNET_MSG_GET);
2324         LASSERT(reply->msg_ev.type == LNET_EVENT_REPLY);
2325
2326         /*
2327          * NB I trusted my peer to RDMA.  If she tells me she's written beyond
2328          * the end of my buffer, I might as well be dead.
2329          */
2330         LASSERT(len <= reply->msg_ev.mlength);
2331
2332         reply->msg_ev.mlength = len;
2333 }
2334 EXPORT_SYMBOL(lnet_set_reply_msg_len);
2335
2336 /**
2337  * Initiate an asynchronous GET operation.
2338  *
2339  * On the initiator node, an LNET_EVENT_SEND is logged when the GET request
2340  * is sent, and an LNET_EVENT_REPLY is logged when the data returned from
2341  * the target node in the REPLY has been written to local MD.
2342  *
2343  * On the target node, an LNET_EVENT_GET is logged when the GET request
2344  * arrives and is accepted into a MD.
2345  *
2346  * \param self,target,portal,match_bits,offset See the discussion in LNetPut().
2347  * \param mdh A handle for the MD that describes the memory into which the
2348  * requested data will be received. The MD must be "free floating"
2349  * (See LNetMDBind()).
2350  *
2351  * \retval  0      Success, and only in this case events will be generated
2352  * and logged to EQ (if it exists) of the MD.
2353  * \retval -EIO    Simulated failure.
2354  * \retval -ENOMEM Memory allocation failure.
2355  * \retval -ENOENT Invalid MD object.
2356  */
2357 int
2358 LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
2359         lnet_process_id_t target, unsigned int portal,
2360         __u64 match_bits, unsigned int offset)
2361 {
2362         struct lnet_msg *msg;
2363         struct lnet_libmd *md;
2364         int cpt;
2365         int rc;
2366
2367         LASSERT(the_lnet.ln_refcount > 0);
2368
2369         if (!list_empty(&the_lnet.ln_test_peers) && /* normally we don't */
2370             fail_peer(target.nid, 1)) {   /* shall we now? */
2371                 CERROR("Dropping GET to %s: simulated failure\n",
2372                        libcfs_id2str(target));
2373                 return -EIO;
2374         }
2375
2376         msg = lnet_msg_alloc();
2377         if (!msg) {
2378                 CERROR("Dropping GET to %s: ENOMEM on lnet_msg_t\n",
2379                        libcfs_id2str(target));
2380                 return -ENOMEM;
2381         }
2382
2383         cpt = lnet_cpt_of_cookie(mdh.cookie);
2384         lnet_res_lock(cpt);
2385
2386         md = lnet_handle2md(&mdh);
2387         if (!md || !md->md_threshold || md->md_me) {
2388                 CERROR("Dropping GET (%llu:%d:%s): MD (%d) invalid\n",
2389                        match_bits, portal, libcfs_id2str(target),
2390                        !md ? -1 : md->md_threshold);
2391                 if (md && md->md_me)
2392                         CERROR("REPLY MD also attached to portal %d\n",
2393                                md->md_me->me_portal);
2394
2395                 lnet_res_unlock(cpt);
2396
2397                 lnet_msg_free(msg);
2398                 return -ENOENT;
2399         }
2400
2401         CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target));
2402
2403         lnet_msg_attach_md(msg, md, 0, 0);
2404
2405         lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0);
2406
2407         msg->msg_hdr.msg.get.match_bits = cpu_to_le64(match_bits);
2408         msg->msg_hdr.msg.get.ptl_index = cpu_to_le32(portal);
2409         msg->msg_hdr.msg.get.src_offset = cpu_to_le32(offset);
2410         msg->msg_hdr.msg.get.sink_length = cpu_to_le32(md->md_length);
2411
2412         /* NB handles only looked up by creator (no flips) */
2413         msg->msg_hdr.msg.get.return_wmd.wh_interface_cookie =
2414                 the_lnet.ln_interface_cookie;
2415         msg->msg_hdr.msg.get.return_wmd.wh_object_cookie =
2416                 md->md_lh.lh_cookie;
2417
2418         lnet_res_unlock(cpt);
2419
2420         lnet_build_msg_event(msg, LNET_EVENT_SEND);
2421
2422         rc = lnet_send(self, msg, LNET_NID_ANY);
2423         if (rc < 0) {
2424                 CNETERR("Error sending GET to %s: %d\n",
2425                         libcfs_id2str(target), rc);
2426                 lnet_finalize(NULL, msg, rc);
2427         }
2428
2429         /* completion will be signalled by an event */
2430         return 0;
2431 }
2432 EXPORT_SYMBOL(LNetGet);
2433
2434 /**
2435  * Calculate distance to node at \a dstnid.
2436  *
2437  * \param dstnid Target NID.
2438  * \param srcnidp If not NULL, NID of the local interface to reach \a dstnid
2439  * is saved here.
2440  * \param orderp If not NULL, order of the route to reach \a dstnid is saved
2441  * here.
2442  *
2443  * \retval 0 If \a dstnid belongs to a local interface, and reserved option
2444  * local_nid_dist_zero is set, which is the default.
2445  * \retval positives Distance to target NID, i.e. number of hops plus one.
2446  * \retval -EHOSTUNREACH If \a dstnid is not reachable.
2447  */
2448 int
2449 LNetDist(lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
2450 {
2451         struct list_head *e;
2452         struct lnet_ni *ni;
2453         lnet_remotenet_t *rnet;
2454         __u32 dstnet = LNET_NIDNET(dstnid);
2455         int hops;
2456         int cpt;
2457         __u32 order = 2;
2458         struct list_head *rn_list;
2459
2460         /*
2461          * if !local_nid_dist_zero, I don't return a distance of 0 ever
2462          * (when lustre sees a distance of 0, it substitutes 0@lo), so I
2463          * keep order 0 free for 0@lo and order 1 free for a local NID
2464          * match
2465          */
2466         LASSERT(the_lnet.ln_refcount > 0);
2467
2468         cpt = lnet_net_lock_current();
2469
2470         list_for_each(e, &the_lnet.ln_nis) {
2471                 ni = list_entry(e, lnet_ni_t, ni_list);
2472
2473                 if (ni->ni_nid == dstnid) {
2474                         if (srcnidp)
2475                                 *srcnidp = dstnid;
2476                         if (orderp) {
2477                                 if (LNET_NETTYP(LNET_NIDNET(dstnid)) == LOLND)
2478                                         *orderp = 0;
2479                                 else
2480                                         *orderp = 1;
2481                         }
2482                         lnet_net_unlock(cpt);
2483
2484                         return local_nid_dist_zero ? 0 : 1;
2485                 }
2486
2487                 if (LNET_NIDNET(ni->ni_nid) == dstnet) {
2488                         if (srcnidp)
2489                                 *srcnidp = ni->ni_nid;
2490                         if (orderp)
2491                                 *orderp = order;
2492                         lnet_net_unlock(cpt);
2493                         return 1;
2494                 }
2495
2496                 order++;
2497         }
2498
2499         rn_list = lnet_net2rnethash(dstnet);
2500         list_for_each(e, rn_list) {
2501                 rnet = list_entry(e, lnet_remotenet_t, lrn_list);
2502
2503                 if (rnet->lrn_net == dstnet) {
2504                         lnet_route_t *route;
2505                         lnet_route_t *shortest = NULL;
2506
2507                         LASSERT(!list_empty(&rnet->lrn_routes));
2508
2509                         list_for_each_entry(route, &rnet->lrn_routes,
2510                                             lr_list) {
2511                                 if (!shortest ||
2512                                     route->lr_hops < shortest->lr_hops)
2513                                         shortest = route;
2514                         }
2515
2516                         LASSERT(shortest);
2517                         hops = shortest->lr_hops;
2518                         if (srcnidp)
2519                                 *srcnidp = shortest->lr_gateway->lp_ni->ni_nid;
2520                         if (orderp)
2521                                 *orderp = order;
2522                         lnet_net_unlock(cpt);
2523                         return hops + 1;
2524                 }
2525                 order++;
2526         }
2527
2528         lnet_net_unlock(cpt);
2529         return -EHOSTUNREACH;
2530 }
2531 EXPORT_SYMBOL(LNetDist);