Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/sparc
[sfrench/cifs-2.6.git] / drivers / staging / lustre / lnet / lnet / net_fault.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * GPL HEADER START
4  *
5  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License version 2 only,
9  * as published by the Free Software Foundation.
10  *
11  * This program is distributed in the hope that it will be useful, but
12  * WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License version 2 for more details (a copy is included
15  * in the LICENSE file that accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License
18  * version 2 along with this program; If not, see
19  * http://www.gnu.org/licenses/gpl-2.0.html
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2014, Intel Corporation.
25  */
26 /*
27  * This file is part of Lustre, http://www.lustre.org/
28  * Lustre is a trademark of Seagate, Inc.
29  *
30  * lnet/lnet/net_fault.c
31  *
32  * Lustre network fault simulation
33  *
34  * Author: liang.zhen@intel.com
35  */
36
37 #define DEBUG_SUBSYSTEM S_LNET
38
39 #include <linux/lnet/lib-lnet.h>
40 #include <uapi/linux/lnet/lnetctl.h>
41
42 #define LNET_MSG_MASK           (LNET_PUT_BIT | LNET_ACK_BIT | \
43                                  LNET_GET_BIT | LNET_REPLY_BIT)
44
45 struct lnet_drop_rule {
46         /** link chain on the_lnet.ln_drop_rules */
47         struct list_head        dr_link;
48         /** attributes of this rule */
49         struct lnet_fault_attr  dr_attr;
50         /** lock to protect \a dr_drop_at and \a dr_stat */
51         spinlock_t              dr_lock;
52         /**
53          * the message sequence to drop, which means message is dropped when
54          * dr_stat.drs_count == dr_drop_at
55          */
56         unsigned long           dr_drop_at;
57         /**
58          * seconds to drop the next message, it's exclusive with dr_drop_at
59          */
60         unsigned long           dr_drop_time;
61         /** baseline to caculate dr_drop_time */
62         unsigned long           dr_time_base;
63         /** statistic of dropped messages */
64         struct lnet_fault_stat  dr_stat;
65 };
66
67 static bool
68 lnet_fault_nid_match(lnet_nid_t nid, lnet_nid_t msg_nid)
69 {
70         if (nid == msg_nid || nid == LNET_NID_ANY)
71                 return true;
72
73         if (LNET_NIDNET(nid) != LNET_NIDNET(msg_nid))
74                 return false;
75
76         /* 255.255.255.255@net is wildcard for all addresses in a network */
77         return LNET_NIDADDR(nid) == LNET_NIDADDR(LNET_NID_ANY);
78 }
79
80 static bool
81 lnet_fault_attr_match(struct lnet_fault_attr *attr, lnet_nid_t src,
82                       lnet_nid_t dst, unsigned int type, unsigned int portal)
83 {
84         if (!lnet_fault_nid_match(attr->fa_src, src) ||
85             !lnet_fault_nid_match(attr->fa_dst, dst))
86                 return false;
87
88         if (!(attr->fa_msg_mask & (1 << type)))
89                 return false;
90
91         /**
92          * NB: ACK and REPLY have no portal, but they should have been
93          * rejected by message mask
94          */
95         if (attr->fa_ptl_mask && /* has portal filter */
96             !(attr->fa_ptl_mask & (1ULL << portal)))
97                 return false;
98
99         return true;
100 }
101
102 static int
103 lnet_fault_attr_validate(struct lnet_fault_attr *attr)
104 {
105         if (!attr->fa_msg_mask)
106                 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
107
108         if (!attr->fa_ptl_mask) /* no portal filter */
109                 return 0;
110
111         /* NB: only PUT and GET can be filtered if portal filter has been set */
112         attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
113         if (!attr->fa_msg_mask) {
114                 CDEBUG(D_NET, "can't find valid message type bits %x\n",
115                        attr->fa_msg_mask);
116                 return -EINVAL;
117         }
118         return 0;
119 }
120
121 static void
122 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
123 {
124         /* NB: fs_counter is NOT updated by this function */
125         switch (type) {
126         case LNET_MSG_PUT:
127                 stat->fs_put++;
128                 return;
129         case LNET_MSG_ACK:
130                 stat->fs_ack++;
131                 return;
132         case LNET_MSG_GET:
133                 stat->fs_get++;
134                 return;
135         case LNET_MSG_REPLY:
136                 stat->fs_reply++;
137                 return;
138         }
139 }
140
141 /**
142  * LNet message drop simulation
143  */
144
145 /**
146  * Add a new drop rule to LNet
147  * There is no check for duplicated drop rule, all rules will be checked for
148  * incoming message.
149  */
150 static int
151 lnet_drop_rule_add(struct lnet_fault_attr *attr)
152 {
153         struct lnet_drop_rule *rule;
154
155         if (attr->u.drop.da_rate & attr->u.drop.da_interval) {
156                 CDEBUG(D_NET, "please provide either drop rate or drop interval, but not both at the same time %d/%d\n",
157                        attr->u.drop.da_rate, attr->u.drop.da_interval);
158                 return -EINVAL;
159         }
160
161         if (lnet_fault_attr_validate(attr))
162                 return -EINVAL;
163
164         CFS_ALLOC_PTR(rule);
165         if (!rule)
166                 return -ENOMEM;
167
168         spin_lock_init(&rule->dr_lock);
169
170         rule->dr_attr = *attr;
171         if (attr->u.drop.da_interval) {
172                 rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
173                 rule->dr_drop_time = cfs_time_shift(cfs_rand() %
174                                                     attr->u.drop.da_interval);
175         } else {
176                 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
177         }
178
179         lnet_net_lock(LNET_LOCK_EX);
180         list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
181         lnet_net_unlock(LNET_LOCK_EX);
182
183         CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
184                libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
185                attr->u.drop.da_rate, attr->u.drop.da_interval);
186         return 0;
187 }
188
189 /**
190  * Remove matched drop rules from lnet, all rules that can match \a src and
191  * \a dst will be removed.
192  * If \a src is zero, then all rules have \a dst as destination will be remove
193  * If \a dst is zero, then all rules have \a src as source will be removed
194  * If both of them are zero, all rules will be removed
195  */
196 static int
197 lnet_drop_rule_del(lnet_nid_t src, lnet_nid_t dst)
198 {
199         struct lnet_drop_rule *rule;
200         struct lnet_drop_rule *tmp;
201         struct list_head zombies;
202         int n = 0;
203
204         INIT_LIST_HEAD(&zombies);
205
206         lnet_net_lock(LNET_LOCK_EX);
207         list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
208                 if (rule->dr_attr.fa_src != src && src)
209                         continue;
210
211                 if (rule->dr_attr.fa_dst != dst && dst)
212                         continue;
213
214                 list_move(&rule->dr_link, &zombies);
215         }
216         lnet_net_unlock(LNET_LOCK_EX);
217
218         list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
219                 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
220                        libcfs_nid2str(rule->dr_attr.fa_src),
221                        libcfs_nid2str(rule->dr_attr.fa_dst),
222                        rule->dr_attr.u.drop.da_rate,
223                        rule->dr_attr.u.drop.da_interval);
224
225                 list_del(&rule->dr_link);
226                 CFS_FREE_PTR(rule);
227                 n++;
228         }
229
230         return n;
231 }
232
233 /**
234  * List drop rule at position of \a pos
235  */
236 static int
237 lnet_drop_rule_list(int pos, struct lnet_fault_attr *attr,
238                     struct lnet_fault_stat *stat)
239 {
240         struct lnet_drop_rule *rule;
241         int cpt;
242         int i = 0;
243         int rc = -ENOENT;
244
245         cpt = lnet_net_lock_current();
246         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
247                 if (i++ < pos)
248                         continue;
249
250                 spin_lock(&rule->dr_lock);
251                 *attr = rule->dr_attr;
252                 *stat = rule->dr_stat;
253                 spin_unlock(&rule->dr_lock);
254                 rc = 0;
255                 break;
256         }
257
258         lnet_net_unlock(cpt);
259         return rc;
260 }
261
262 /**
263  * reset counters for all drop rules
264  */
265 static void
266 lnet_drop_rule_reset(void)
267 {
268         struct lnet_drop_rule *rule;
269         int cpt;
270
271         cpt = lnet_net_lock_current();
272
273         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
274                 struct lnet_fault_attr *attr = &rule->dr_attr;
275
276                 spin_lock(&rule->dr_lock);
277
278                 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
279                 if (attr->u.drop.da_rate) {
280                         rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
281                 } else {
282                         rule->dr_drop_time = cfs_time_shift(cfs_rand() %
283                                                 attr->u.drop.da_interval);
284                         rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
285                 }
286                 spin_unlock(&rule->dr_lock);
287         }
288
289         lnet_net_unlock(cpt);
290 }
291
292 /**
293  * check source/destination NID, portal, message type and drop rate,
294  * decide whether should drop this message or not
295  */
296 static bool
297 drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
298                 lnet_nid_t dst, unsigned int type, unsigned int portal)
299 {
300         struct lnet_fault_attr *attr = &rule->dr_attr;
301         bool drop;
302
303         if (!lnet_fault_attr_match(attr, src, dst, type, portal))
304                 return false;
305
306         /* match this rule, check drop rate now */
307         spin_lock(&rule->dr_lock);
308         if (rule->dr_drop_time) { /* time based drop */
309                 unsigned long now = cfs_time_current();
310
311                 rule->dr_stat.fs_count++;
312                 drop = cfs_time_aftereq(now, rule->dr_drop_time);
313                 if (drop) {
314                         if (cfs_time_after(now, rule->dr_time_base))
315                                 rule->dr_time_base = now;
316
317                         rule->dr_drop_time = rule->dr_time_base +
318                                              cfs_time_seconds(cfs_rand() %
319                                                 attr->u.drop.da_interval);
320                         rule->dr_time_base += cfs_time_seconds(attr->u.drop.da_interval);
321
322                         CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %lu\n",
323                                libcfs_nid2str(attr->fa_src),
324                                libcfs_nid2str(attr->fa_dst),
325                                rule->dr_drop_time);
326                 }
327
328         } else { /* rate based drop */
329                 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
330
331                 if (!do_div(rule->dr_stat.fs_count, attr->u.drop.da_rate)) {
332                         rule->dr_drop_at = rule->dr_stat.fs_count +
333                                            cfs_rand() % attr->u.drop.da_rate;
334                         CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
335                                libcfs_nid2str(attr->fa_src),
336                                libcfs_nid2str(attr->fa_dst), rule->dr_drop_at);
337                 }
338         }
339
340         if (drop) { /* drop this message, update counters */
341                 lnet_fault_stat_inc(&rule->dr_stat, type);
342                 rule->dr_stat.u.drop.ds_dropped++;
343         }
344
345         spin_unlock(&rule->dr_lock);
346         return drop;
347 }
348
349 /**
350  * Check if message from \a src to \a dst can match any existed drop rule
351  */
352 bool
353 lnet_drop_rule_match(struct lnet_hdr *hdr)
354 {
355         struct lnet_drop_rule *rule;
356         lnet_nid_t src = le64_to_cpu(hdr->src_nid);
357         lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
358         unsigned int typ = le32_to_cpu(hdr->type);
359         unsigned int ptl = -1;
360         bool drop = false;
361         int cpt;
362
363         /**
364          * NB: if Portal is specified, then only PUT and GET will be
365          * filtered by drop rule
366          */
367         if (typ == LNET_MSG_PUT)
368                 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
369         else if (typ == LNET_MSG_GET)
370                 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
371
372         cpt = lnet_net_lock_current();
373         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
374                 drop = drop_rule_match(rule, src, dst, typ, ptl);
375                 if (drop)
376                         break;
377         }
378
379         lnet_net_unlock(cpt);
380         return drop;
381 }
382
383 /**
384  * LNet Delay Simulation
385  */
386 /** timestamp (second) to send delayed message */
387 #define msg_delay_send           msg_ev.hdr_data
388
389 struct lnet_delay_rule {
390         /** link chain on the_lnet.ln_delay_rules */
391         struct list_head        dl_link;
392         /** link chain on delay_dd.dd_sched_rules */
393         struct list_head        dl_sched_link;
394         /** attributes of this rule */
395         struct lnet_fault_attr  dl_attr;
396         /** lock to protect \a below members */
397         spinlock_t              dl_lock;
398         /** refcount of delay rule */
399         atomic_t                dl_refcount;
400         /**
401          * the message sequence to delay, which means message is delayed when
402          * dl_stat.fs_count == dl_delay_at
403          */
404         unsigned long           dl_delay_at;
405         /**
406          * seconds to delay the next message, it's exclusive with dl_delay_at
407          */
408         unsigned long           dl_delay_time;
409         /** baseline to caculate dl_delay_time */
410         unsigned long           dl_time_base;
411         /** jiffies to send the next delayed message */
412         unsigned long           dl_msg_send;
413         /** delayed message list */
414         struct list_head        dl_msg_list;
415         /** statistic of delayed messages */
416         struct lnet_fault_stat  dl_stat;
417         /** timer to wakeup delay_daemon */
418         struct timer_list       dl_timer;
419 };
420
421 struct delay_daemon_data {
422         /** serialise rule add/remove */
423         struct mutex            dd_mutex;
424         /** protect rules on \a dd_sched_rules */
425         spinlock_t              dd_lock;
426         /** scheduled delay rules (by timer) */
427         struct list_head        dd_sched_rules;
428         /** daemon thread sleeps at here */
429         wait_queue_head_t       dd_waitq;
430         /** controller (lctl command) wait at here */
431         wait_queue_head_t       dd_ctl_waitq;
432         /** daemon is running */
433         unsigned int            dd_running;
434         /** daemon stopped */
435         unsigned int            dd_stopped;
436 };
437
438 static struct delay_daemon_data delay_dd;
439
440 static unsigned long
441 round_timeout(unsigned long timeout)
442 {
443         return cfs_time_seconds((unsigned int)
444                         cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
445 }
446
447 static void
448 delay_rule_decref(struct lnet_delay_rule *rule)
449 {
450         if (atomic_dec_and_test(&rule->dl_refcount)) {
451                 LASSERT(list_empty(&rule->dl_sched_link));
452                 LASSERT(list_empty(&rule->dl_msg_list));
453                 LASSERT(list_empty(&rule->dl_link));
454
455                 CFS_FREE_PTR(rule);
456         }
457 }
458
459 /**
460  * check source/destination NID, portal, message type and delay rate,
461  * decide whether should delay this message or not
462  */
463 static bool
464 delay_rule_match(struct lnet_delay_rule *rule, lnet_nid_t src,
465                  lnet_nid_t dst, unsigned int type, unsigned int portal,
466                  struct lnet_msg *msg)
467 {
468         struct lnet_fault_attr *attr = &rule->dl_attr;
469         bool delay;
470
471         if (!lnet_fault_attr_match(attr, src, dst, type, portal))
472                 return false;
473
474         /* match this rule, check delay rate now */
475         spin_lock(&rule->dl_lock);
476         if (rule->dl_delay_time) { /* time based delay */
477                 unsigned long now = cfs_time_current();
478
479                 rule->dl_stat.fs_count++;
480                 delay = cfs_time_aftereq(now, rule->dl_delay_time);
481                 if (delay) {
482                         if (cfs_time_after(now, rule->dl_time_base))
483                                 rule->dl_time_base = now;
484
485                         rule->dl_delay_time = rule->dl_time_base +
486                                              cfs_time_seconds(cfs_rand() %
487                                                 attr->u.delay.la_interval);
488                         rule->dl_time_base += cfs_time_seconds(attr->u.delay.la_interval);
489
490                         CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %lu\n",
491                                libcfs_nid2str(attr->fa_src),
492                                libcfs_nid2str(attr->fa_dst),
493                                rule->dl_delay_time);
494                 }
495
496         } else { /* rate based delay */
497                 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
498                 /* generate the next random rate sequence */
499                 if (!do_div(rule->dl_stat.fs_count, attr->u.delay.la_rate)) {
500                         rule->dl_delay_at = rule->dl_stat.fs_count +
501                                             cfs_rand() % attr->u.delay.la_rate;
502                         CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
503                                libcfs_nid2str(attr->fa_src),
504                                libcfs_nid2str(attr->fa_dst), rule->dl_delay_at);
505                 }
506         }
507
508         if (!delay) {
509                 spin_unlock(&rule->dl_lock);
510                 return false;
511         }
512
513         /* delay this message, update counters */
514         lnet_fault_stat_inc(&rule->dl_stat, type);
515         rule->dl_stat.u.delay.ls_delayed++;
516
517         list_add_tail(&msg->msg_list, &rule->dl_msg_list);
518         msg->msg_delay_send = round_timeout(
519                         cfs_time_shift(attr->u.delay.la_latency));
520         if (rule->dl_msg_send == -1) {
521                 rule->dl_msg_send = msg->msg_delay_send;
522                 mod_timer(&rule->dl_timer, rule->dl_msg_send);
523         }
524
525         spin_unlock(&rule->dl_lock);
526         return true;
527 }
528
529 /**
530  * check if \a msg can match any Delay Rule, receiving of this message
531  * will be delayed if there is a match.
532  */
533 bool
534 lnet_delay_rule_match_locked(struct lnet_hdr *hdr, struct lnet_msg *msg)
535 {
536         struct lnet_delay_rule *rule;
537         lnet_nid_t src = le64_to_cpu(hdr->src_nid);
538         lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
539         unsigned int typ = le32_to_cpu(hdr->type);
540         unsigned int ptl = -1;
541
542         /* NB: called with hold of lnet_net_lock */
543
544         /**
545          * NB: if Portal is specified, then only PUT and GET will be
546          * filtered by delay rule
547          */
548         if (typ == LNET_MSG_PUT)
549                 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
550         else if (typ == LNET_MSG_GET)
551                 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
552
553         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
554                 if (delay_rule_match(rule, src, dst, typ, ptl, msg))
555                         return true;
556         }
557
558         return false;
559 }
560
561 /** check out delayed messages for send */
562 static void
563 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
564                   struct list_head *msg_list)
565 {
566         struct lnet_msg *msg;
567         struct lnet_msg *tmp;
568         unsigned long now = cfs_time_current();
569
570         if (!all && rule->dl_msg_send > now)
571                 return;
572
573         spin_lock(&rule->dl_lock);
574         list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
575                 if (!all && msg->msg_delay_send > now)
576                         break;
577
578                 msg->msg_delay_send = 0;
579                 list_move_tail(&msg->msg_list, msg_list);
580         }
581
582         if (list_empty(&rule->dl_msg_list)) {
583                 del_timer(&rule->dl_timer);
584                 rule->dl_msg_send = -1;
585
586         } else if (!list_empty(msg_list)) {
587                 /*
588                  * dequeued some timedout messages, update timer for the
589                  * next delayed message on rule
590                  */
591                 msg = list_entry(rule->dl_msg_list.next,
592                                  struct lnet_msg, msg_list);
593                 rule->dl_msg_send = msg->msg_delay_send;
594                 mod_timer(&rule->dl_timer, rule->dl_msg_send);
595         }
596         spin_unlock(&rule->dl_lock);
597 }
598
599 static void
600 delayed_msg_process(struct list_head *msg_list, bool drop)
601 {
602         struct lnet_msg *msg;
603
604         while (!list_empty(msg_list)) {
605                 struct lnet_ni *ni;
606                 int cpt;
607                 int rc;
608
609                 msg = list_entry(msg_list->next, struct lnet_msg, msg_list);
610                 LASSERT(msg->msg_rxpeer);
611
612                 ni = msg->msg_rxpeer->lp_ni;
613                 cpt = msg->msg_rx_cpt;
614
615                 list_del_init(&msg->msg_list);
616                 if (drop) {
617                         rc = -ECANCELED;
618
619                 } else if (!msg->msg_routing) {
620                         rc = lnet_parse_local(ni, msg);
621                         if (!rc)
622                                 continue;
623
624                 } else {
625                         lnet_net_lock(cpt);
626                         rc = lnet_parse_forward_locked(ni, msg);
627                         lnet_net_unlock(cpt);
628
629                         switch (rc) {
630                         case LNET_CREDIT_OK:
631                                 lnet_ni_recv(ni, msg->msg_private, msg, 0,
632                                              0, msg->msg_len, msg->msg_len);
633                                 /* fall through */
634                         case LNET_CREDIT_WAIT:
635                                 continue;
636                         default: /* failures */
637                                 break;
638                         }
639                 }
640
641                 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len);
642                 lnet_finalize(ni, msg, rc);
643         }
644 }
645
646 /**
647  * Process delayed messages for scheduled rules
648  * This function can either be called by delay_rule_daemon, or by lnet_finalise
649  */
650 void
651 lnet_delay_rule_check(void)
652 {
653         struct lnet_delay_rule *rule;
654         struct list_head msgs;
655
656         INIT_LIST_HEAD(&msgs);
657         while (1) {
658                 if (list_empty(&delay_dd.dd_sched_rules))
659                         break;
660
661                 spin_lock_bh(&delay_dd.dd_lock);
662                 if (list_empty(&delay_dd.dd_sched_rules)) {
663                         spin_unlock_bh(&delay_dd.dd_lock);
664                         break;
665                 }
666
667                 rule = list_entry(delay_dd.dd_sched_rules.next,
668                                   struct lnet_delay_rule, dl_sched_link);
669                 list_del_init(&rule->dl_sched_link);
670                 spin_unlock_bh(&delay_dd.dd_lock);
671
672                 delayed_msg_check(rule, false, &msgs);
673                 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
674         }
675
676         if (!list_empty(&msgs))
677                 delayed_msg_process(&msgs, false);
678 }
679
680 /** daemon thread to handle delayed messages */
681 static int
682 lnet_delay_rule_daemon(void *arg)
683 {
684         delay_dd.dd_running = 1;
685         wake_up(&delay_dd.dd_ctl_waitq);
686
687         while (delay_dd.dd_running) {
688                 wait_event_interruptible(delay_dd.dd_waitq,
689                                          !delay_dd.dd_running ||
690                                          !list_empty(&delay_dd.dd_sched_rules));
691                 lnet_delay_rule_check();
692         }
693
694         /* in case more rules have been enqueued after my last check */
695         lnet_delay_rule_check();
696         delay_dd.dd_stopped = 1;
697         wake_up(&delay_dd.dd_ctl_waitq);
698
699         return 0;
700 }
701
702 static void
703 delay_timer_cb(struct timer_list *t)
704 {
705         struct lnet_delay_rule *rule = from_timer(rule, t, dl_timer);
706
707         spin_lock_bh(&delay_dd.dd_lock);
708         if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
709                 atomic_inc(&rule->dl_refcount);
710                 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
711                 wake_up(&delay_dd.dd_waitq);
712         }
713         spin_unlock_bh(&delay_dd.dd_lock);
714 }
715
716 /**
717  * Add a new delay rule to LNet
718  * There is no check for duplicated delay rule, all rules will be checked for
719  * incoming message.
720  */
721 int
722 lnet_delay_rule_add(struct lnet_fault_attr *attr)
723 {
724         struct lnet_delay_rule *rule;
725         int rc = 0;
726
727         if (attr->u.delay.la_rate & attr->u.delay.la_interval) {
728                 CDEBUG(D_NET, "please provide either delay rate or delay interval, but not both at the same time %d/%d\n",
729                        attr->u.delay.la_rate, attr->u.delay.la_interval);
730                 return -EINVAL;
731         }
732
733         if (!attr->u.delay.la_latency) {
734                 CDEBUG(D_NET, "delay latency cannot be zero\n");
735                 return -EINVAL;
736         }
737
738         if (lnet_fault_attr_validate(attr))
739                 return -EINVAL;
740
741         CFS_ALLOC_PTR(rule);
742         if (!rule)
743                 return -ENOMEM;
744
745         mutex_lock(&delay_dd.dd_mutex);
746         if (!delay_dd.dd_running) {
747                 struct task_struct *task;
748
749                 /**
750                  *  NB: although LND threads will process delayed message
751                  * in lnet_finalize, but there is no guarantee that LND
752                  * threads will be waken up if no other message needs to
753                  * be handled.
754                  * Only one daemon thread, performance is not the concern
755                  * of this simualation module.
756                  */
757                 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
758                 if (IS_ERR(task)) {
759                         rc = PTR_ERR(task);
760                         goto failed;
761                 }
762                 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
763         }
764
765         timer_setup(&rule->dl_timer, delay_timer_cb, 0);
766
767         spin_lock_init(&rule->dl_lock);
768         INIT_LIST_HEAD(&rule->dl_msg_list);
769         INIT_LIST_HEAD(&rule->dl_sched_link);
770
771         rule->dl_attr = *attr;
772         if (attr->u.delay.la_interval) {
773                 rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
774                 rule->dl_delay_time = cfs_time_shift(cfs_rand() %
775                                                      attr->u.delay.la_interval);
776         } else {
777                 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
778         }
779
780         rule->dl_msg_send = -1;
781
782         lnet_net_lock(LNET_LOCK_EX);
783         atomic_set(&rule->dl_refcount, 1);
784         list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
785         lnet_net_unlock(LNET_LOCK_EX);
786
787         CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
788                libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
789                attr->u.delay.la_rate);
790
791         mutex_unlock(&delay_dd.dd_mutex);
792         return 0;
793 failed:
794         mutex_unlock(&delay_dd.dd_mutex);
795         CFS_FREE_PTR(rule);
796         return rc;
797 }
798
799 /**
800  * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
801  * and \a dst are zero, all rules will be removed, otherwise only matched rules
802  * will be removed.
803  * If \a src is zero, then all rules have \a dst as destination will be remove
804  * If \a dst is zero, then all rules have \a src as source will be removed
805  *
806  * When a delay rule is removed, all delayed messages of this rule will be
807  * processed immediately.
808  */
809 int
810 lnet_delay_rule_del(lnet_nid_t src, lnet_nid_t dst, bool shutdown)
811 {
812         struct lnet_delay_rule *rule;
813         struct lnet_delay_rule *tmp;
814         struct list_head rule_list;
815         struct list_head msg_list;
816         int n = 0;
817         bool cleanup;
818
819         INIT_LIST_HEAD(&rule_list);
820         INIT_LIST_HEAD(&msg_list);
821
822         if (shutdown) {
823                 src = 0;
824                 dst = 0;
825         }
826
827         mutex_lock(&delay_dd.dd_mutex);
828         lnet_net_lock(LNET_LOCK_EX);
829
830         list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
831                 if (rule->dl_attr.fa_src != src && src)
832                         continue;
833
834                 if (rule->dl_attr.fa_dst != dst && dst)
835                         continue;
836
837                 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
838                        libcfs_nid2str(rule->dl_attr.fa_src),
839                        libcfs_nid2str(rule->dl_attr.fa_dst),
840                        rule->dl_attr.u.delay.la_rate,
841                        rule->dl_attr.u.delay.la_interval);
842                 /* refcount is taken over by rule_list */
843                 list_move(&rule->dl_link, &rule_list);
844         }
845
846         /* check if we need to shutdown delay_daemon */
847         cleanup = list_empty(&the_lnet.ln_delay_rules) &&
848                   !list_empty(&rule_list);
849         lnet_net_unlock(LNET_LOCK_EX);
850
851         list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
852                 list_del_init(&rule->dl_link);
853
854                 del_timer_sync(&rule->dl_timer);
855                 delayed_msg_check(rule, true, &msg_list);
856                 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
857                 n++;
858         }
859
860         if (cleanup) { /* no more delay rule, shutdown delay_daemon */
861                 LASSERT(delay_dd.dd_running);
862                 delay_dd.dd_running = 0;
863                 wake_up(&delay_dd.dd_waitq);
864
865                 while (!delay_dd.dd_stopped)
866                         wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
867         }
868         mutex_unlock(&delay_dd.dd_mutex);
869
870         if (!list_empty(&msg_list))
871                 delayed_msg_process(&msg_list, shutdown);
872
873         return n;
874 }
875
876 /**
877  * List Delay Rule at position of \a pos
878  */
879 int
880 lnet_delay_rule_list(int pos, struct lnet_fault_attr *attr,
881                      struct lnet_fault_stat *stat)
882 {
883         struct lnet_delay_rule *rule;
884         int cpt;
885         int i = 0;
886         int rc = -ENOENT;
887
888         cpt = lnet_net_lock_current();
889         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
890                 if (i++ < pos)
891                         continue;
892
893                 spin_lock(&rule->dl_lock);
894                 *attr = rule->dl_attr;
895                 *stat = rule->dl_stat;
896                 spin_unlock(&rule->dl_lock);
897                 rc = 0;
898                 break;
899         }
900
901         lnet_net_unlock(cpt);
902         return rc;
903 }
904
905 /**
906  * reset counters for all Delay Rules
907  */
908 void
909 lnet_delay_rule_reset(void)
910 {
911         struct lnet_delay_rule *rule;
912         int cpt;
913
914         cpt = lnet_net_lock_current();
915
916         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
917                 struct lnet_fault_attr *attr = &rule->dl_attr;
918
919                 spin_lock(&rule->dl_lock);
920
921                 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
922                 if (attr->u.delay.la_rate) {
923                         rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
924                 } else {
925                         rule->dl_delay_time = cfs_time_shift(cfs_rand() %
926                                                 attr->u.delay.la_interval);
927                         rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
928                 }
929                 spin_unlock(&rule->dl_lock);
930         }
931
932         lnet_net_unlock(cpt);
933 }
934
935 int
936 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
937 {
938         struct lnet_fault_attr *attr;
939         struct lnet_fault_stat *stat;
940
941         attr = (struct lnet_fault_attr *)data->ioc_inlbuf1;
942
943         switch (opc) {
944         default:
945                 return -EINVAL;
946
947         case LNET_CTL_DROP_ADD:
948                 if (!attr)
949                         return -EINVAL;
950
951                 return lnet_drop_rule_add(attr);
952
953         case LNET_CTL_DROP_DEL:
954                 if (!attr)
955                         return -EINVAL;
956
957                 data->ioc_count = lnet_drop_rule_del(attr->fa_src,
958                                                      attr->fa_dst);
959                 return 0;
960
961         case LNET_CTL_DROP_RESET:
962                 lnet_drop_rule_reset();
963                 return 0;
964
965         case LNET_CTL_DROP_LIST:
966                 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
967                 if (!attr || !stat)
968                         return -EINVAL;
969
970                 return lnet_drop_rule_list(data->ioc_count, attr, stat);
971
972         case LNET_CTL_DELAY_ADD:
973                 if (!attr)
974                         return -EINVAL;
975
976                 return lnet_delay_rule_add(attr);
977
978         case LNET_CTL_DELAY_DEL:
979                 if (!attr)
980                         return -EINVAL;
981
982                 data->ioc_count = lnet_delay_rule_del(attr->fa_src,
983                                                       attr->fa_dst, false);
984                 return 0;
985
986         case LNET_CTL_DELAY_RESET:
987                 lnet_delay_rule_reset();
988                 return 0;
989
990         case LNET_CTL_DELAY_LIST:
991                 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
992                 if (!attr || !stat)
993                         return -EINVAL;
994
995                 return lnet_delay_rule_list(data->ioc_count, attr, stat);
996         }
997 }
998
999 int
1000 lnet_fault_init(void)
1001 {
1002         BUILD_BUG_ON(LNET_PUT_BIT != 1 << LNET_MSG_PUT);
1003         BUILD_BUG_ON(LNET_ACK_BIT != 1 << LNET_MSG_ACK);
1004         BUILD_BUG_ON(LNET_GET_BIT != 1 << LNET_MSG_GET);
1005         BUILD_BUG_ON(LNET_REPLY_BIT != 1 << LNET_MSG_REPLY);
1006
1007         mutex_init(&delay_dd.dd_mutex);
1008         spin_lock_init(&delay_dd.dd_lock);
1009         init_waitqueue_head(&delay_dd.dd_waitq);
1010         init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1011         INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1012
1013         return 0;
1014 }
1015
1016 void
1017 lnet_fault_fini(void)
1018 {
1019         lnet_drop_rule_del(0, 0);
1020         lnet_delay_rule_del(0, 0, true);
1021
1022         LASSERT(list_empty(&the_lnet.ln_drop_rules));
1023         LASSERT(list_empty(&the_lnet.ln_delay_rules));
1024         LASSERT(list_empty(&delay_dd.dd_sched_rules));
1025 }