1 // SPDX-License-Identifier: GPL-2.0
5 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License version 2 only,
9 * as published by the Free Software Foundation.
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License version 2 for more details (a copy is included
15 * in the LICENSE file that accompanied this code).
17 * You should have received a copy of the GNU General Public License
18 * version 2 along with this program; If not, see
19 * http://www.gnu.org/licenses/gpl-2.0.html
24 * Copyright (c) 2014, Intel Corporation.
27 * This file is part of Lustre, http://www.lustre.org/
28 * Lustre is a trademark of Seagate, Inc.
30 * lnet/lnet/net_fault.c
32 * Lustre network fault simulation
34 * Author: liang.zhen@intel.com
37 #define DEBUG_SUBSYSTEM S_LNET
39 #include <linux/lnet/lib-lnet.h>
40 #include <uapi/linux/lnet/lnetctl.h>
42 #define LNET_MSG_MASK (LNET_PUT_BIT | LNET_ACK_BIT | \
43 LNET_GET_BIT | LNET_REPLY_BIT)
45 struct lnet_drop_rule {
46 /** link chain on the_lnet.ln_drop_rules */
47 struct list_head dr_link;
48 /** attributes of this rule */
49 struct lnet_fault_attr dr_attr;
50 /** lock to protect \a dr_drop_at and \a dr_stat */
53 * the message sequence to drop, which means message is dropped when
54 * dr_stat.drs_count == dr_drop_at
56 unsigned long dr_drop_at;
58 * seconds to drop the next message, it's exclusive with dr_drop_at
60 unsigned long dr_drop_time;
61 /** baseline to caculate dr_drop_time */
62 unsigned long dr_time_base;
63 /** statistic of dropped messages */
64 struct lnet_fault_stat dr_stat;
68 lnet_fault_nid_match(lnet_nid_t nid, lnet_nid_t msg_nid)
70 if (nid == msg_nid || nid == LNET_NID_ANY)
73 if (LNET_NIDNET(nid) != LNET_NIDNET(msg_nid))
76 /* 255.255.255.255@net is wildcard for all addresses in a network */
77 return LNET_NIDADDR(nid) == LNET_NIDADDR(LNET_NID_ANY);
81 lnet_fault_attr_match(struct lnet_fault_attr *attr, lnet_nid_t src,
82 lnet_nid_t dst, unsigned int type, unsigned int portal)
84 if (!lnet_fault_nid_match(attr->fa_src, src) ||
85 !lnet_fault_nid_match(attr->fa_dst, dst))
88 if (!(attr->fa_msg_mask & (1 << type)))
92 * NB: ACK and REPLY have no portal, but they should have been
93 * rejected by message mask
95 if (attr->fa_ptl_mask && /* has portal filter */
96 !(attr->fa_ptl_mask & (1ULL << portal)))
103 lnet_fault_attr_validate(struct lnet_fault_attr *attr)
105 if (!attr->fa_msg_mask)
106 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
108 if (!attr->fa_ptl_mask) /* no portal filter */
111 /* NB: only PUT and GET can be filtered if portal filter has been set */
112 attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
113 if (!attr->fa_msg_mask) {
114 CDEBUG(D_NET, "can't find valid message type bits %x\n",
122 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
124 /* NB: fs_counter is NOT updated by this function */
142 * LNet message drop simulation
146 * Add a new drop rule to LNet
147 * There is no check for duplicated drop rule, all rules will be checked for
151 lnet_drop_rule_add(struct lnet_fault_attr *attr)
153 struct lnet_drop_rule *rule;
155 if (attr->u.drop.da_rate & attr->u.drop.da_interval) {
156 CDEBUG(D_NET, "please provide either drop rate or drop interval, but not both at the same time %d/%d\n",
157 attr->u.drop.da_rate, attr->u.drop.da_interval);
161 if (lnet_fault_attr_validate(attr))
168 spin_lock_init(&rule->dr_lock);
170 rule->dr_attr = *attr;
171 if (attr->u.drop.da_interval) {
172 rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
173 rule->dr_drop_time = cfs_time_shift(cfs_rand() %
174 attr->u.drop.da_interval);
176 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
179 lnet_net_lock(LNET_LOCK_EX);
180 list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
181 lnet_net_unlock(LNET_LOCK_EX);
183 CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
184 libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
185 attr->u.drop.da_rate, attr->u.drop.da_interval);
190 * Remove matched drop rules from lnet, all rules that can match \a src and
191 * \a dst will be removed.
192 * If \a src is zero, then all rules have \a dst as destination will be remove
193 * If \a dst is zero, then all rules have \a src as source will be removed
194 * If both of them are zero, all rules will be removed
197 lnet_drop_rule_del(lnet_nid_t src, lnet_nid_t dst)
199 struct lnet_drop_rule *rule;
200 struct lnet_drop_rule *tmp;
201 struct list_head zombies;
204 INIT_LIST_HEAD(&zombies);
206 lnet_net_lock(LNET_LOCK_EX);
207 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
208 if (rule->dr_attr.fa_src != src && src)
211 if (rule->dr_attr.fa_dst != dst && dst)
214 list_move(&rule->dr_link, &zombies);
216 lnet_net_unlock(LNET_LOCK_EX);
218 list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
219 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
220 libcfs_nid2str(rule->dr_attr.fa_src),
221 libcfs_nid2str(rule->dr_attr.fa_dst),
222 rule->dr_attr.u.drop.da_rate,
223 rule->dr_attr.u.drop.da_interval);
225 list_del(&rule->dr_link);
234 * List drop rule at position of \a pos
237 lnet_drop_rule_list(int pos, struct lnet_fault_attr *attr,
238 struct lnet_fault_stat *stat)
240 struct lnet_drop_rule *rule;
245 cpt = lnet_net_lock_current();
246 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
250 spin_lock(&rule->dr_lock);
251 *attr = rule->dr_attr;
252 *stat = rule->dr_stat;
253 spin_unlock(&rule->dr_lock);
258 lnet_net_unlock(cpt);
263 * reset counters for all drop rules
266 lnet_drop_rule_reset(void)
268 struct lnet_drop_rule *rule;
271 cpt = lnet_net_lock_current();
273 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
274 struct lnet_fault_attr *attr = &rule->dr_attr;
276 spin_lock(&rule->dr_lock);
278 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
279 if (attr->u.drop.da_rate) {
280 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
282 rule->dr_drop_time = cfs_time_shift(cfs_rand() %
283 attr->u.drop.da_interval);
284 rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
286 spin_unlock(&rule->dr_lock);
289 lnet_net_unlock(cpt);
293 * check source/destination NID, portal, message type and drop rate,
294 * decide whether should drop this message or not
297 drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
298 lnet_nid_t dst, unsigned int type, unsigned int portal)
300 struct lnet_fault_attr *attr = &rule->dr_attr;
303 if (!lnet_fault_attr_match(attr, src, dst, type, portal))
306 /* match this rule, check drop rate now */
307 spin_lock(&rule->dr_lock);
308 if (rule->dr_drop_time) { /* time based drop */
309 unsigned long now = cfs_time_current();
311 rule->dr_stat.fs_count++;
312 drop = cfs_time_aftereq(now, rule->dr_drop_time);
314 if (cfs_time_after(now, rule->dr_time_base))
315 rule->dr_time_base = now;
317 rule->dr_drop_time = rule->dr_time_base +
318 cfs_time_seconds(cfs_rand() %
319 attr->u.drop.da_interval);
320 rule->dr_time_base += cfs_time_seconds(attr->u.drop.da_interval);
322 CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %lu\n",
323 libcfs_nid2str(attr->fa_src),
324 libcfs_nid2str(attr->fa_dst),
328 } else { /* rate based drop */
329 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
331 if (!do_div(rule->dr_stat.fs_count, attr->u.drop.da_rate)) {
332 rule->dr_drop_at = rule->dr_stat.fs_count +
333 cfs_rand() % attr->u.drop.da_rate;
334 CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
335 libcfs_nid2str(attr->fa_src),
336 libcfs_nid2str(attr->fa_dst), rule->dr_drop_at);
340 if (drop) { /* drop this message, update counters */
341 lnet_fault_stat_inc(&rule->dr_stat, type);
342 rule->dr_stat.u.drop.ds_dropped++;
345 spin_unlock(&rule->dr_lock);
350 * Check if message from \a src to \a dst can match any existed drop rule
353 lnet_drop_rule_match(struct lnet_hdr *hdr)
355 struct lnet_drop_rule *rule;
356 lnet_nid_t src = le64_to_cpu(hdr->src_nid);
357 lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
358 unsigned int typ = le32_to_cpu(hdr->type);
359 unsigned int ptl = -1;
364 * NB: if Portal is specified, then only PUT and GET will be
365 * filtered by drop rule
367 if (typ == LNET_MSG_PUT)
368 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
369 else if (typ == LNET_MSG_GET)
370 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
372 cpt = lnet_net_lock_current();
373 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
374 drop = drop_rule_match(rule, src, dst, typ, ptl);
379 lnet_net_unlock(cpt);
384 * LNet Delay Simulation
386 /** timestamp (second) to send delayed message */
387 #define msg_delay_send msg_ev.hdr_data
389 struct lnet_delay_rule {
390 /** link chain on the_lnet.ln_delay_rules */
391 struct list_head dl_link;
392 /** link chain on delay_dd.dd_sched_rules */
393 struct list_head dl_sched_link;
394 /** attributes of this rule */
395 struct lnet_fault_attr dl_attr;
396 /** lock to protect \a below members */
398 /** refcount of delay rule */
399 atomic_t dl_refcount;
401 * the message sequence to delay, which means message is delayed when
402 * dl_stat.fs_count == dl_delay_at
404 unsigned long dl_delay_at;
406 * seconds to delay the next message, it's exclusive with dl_delay_at
408 unsigned long dl_delay_time;
409 /** baseline to caculate dl_delay_time */
410 unsigned long dl_time_base;
411 /** jiffies to send the next delayed message */
412 unsigned long dl_msg_send;
413 /** delayed message list */
414 struct list_head dl_msg_list;
415 /** statistic of delayed messages */
416 struct lnet_fault_stat dl_stat;
417 /** timer to wakeup delay_daemon */
418 struct timer_list dl_timer;
421 struct delay_daemon_data {
422 /** serialise rule add/remove */
423 struct mutex dd_mutex;
424 /** protect rules on \a dd_sched_rules */
426 /** scheduled delay rules (by timer) */
427 struct list_head dd_sched_rules;
428 /** daemon thread sleeps at here */
429 wait_queue_head_t dd_waitq;
430 /** controller (lctl command) wait at here */
431 wait_queue_head_t dd_ctl_waitq;
432 /** daemon is running */
433 unsigned int dd_running;
434 /** daemon stopped */
435 unsigned int dd_stopped;
438 static struct delay_daemon_data delay_dd;
441 round_timeout(unsigned long timeout)
443 return cfs_time_seconds((unsigned int)
444 cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
448 delay_rule_decref(struct lnet_delay_rule *rule)
450 if (atomic_dec_and_test(&rule->dl_refcount)) {
451 LASSERT(list_empty(&rule->dl_sched_link));
452 LASSERT(list_empty(&rule->dl_msg_list));
453 LASSERT(list_empty(&rule->dl_link));
460 * check source/destination NID, portal, message type and delay rate,
461 * decide whether should delay this message or not
464 delay_rule_match(struct lnet_delay_rule *rule, lnet_nid_t src,
465 lnet_nid_t dst, unsigned int type, unsigned int portal,
466 struct lnet_msg *msg)
468 struct lnet_fault_attr *attr = &rule->dl_attr;
471 if (!lnet_fault_attr_match(attr, src, dst, type, portal))
474 /* match this rule, check delay rate now */
475 spin_lock(&rule->dl_lock);
476 if (rule->dl_delay_time) { /* time based delay */
477 unsigned long now = cfs_time_current();
479 rule->dl_stat.fs_count++;
480 delay = cfs_time_aftereq(now, rule->dl_delay_time);
482 if (cfs_time_after(now, rule->dl_time_base))
483 rule->dl_time_base = now;
485 rule->dl_delay_time = rule->dl_time_base +
486 cfs_time_seconds(cfs_rand() %
487 attr->u.delay.la_interval);
488 rule->dl_time_base += cfs_time_seconds(attr->u.delay.la_interval);
490 CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %lu\n",
491 libcfs_nid2str(attr->fa_src),
492 libcfs_nid2str(attr->fa_dst),
493 rule->dl_delay_time);
496 } else { /* rate based delay */
497 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
498 /* generate the next random rate sequence */
499 if (!do_div(rule->dl_stat.fs_count, attr->u.delay.la_rate)) {
500 rule->dl_delay_at = rule->dl_stat.fs_count +
501 cfs_rand() % attr->u.delay.la_rate;
502 CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
503 libcfs_nid2str(attr->fa_src),
504 libcfs_nid2str(attr->fa_dst), rule->dl_delay_at);
509 spin_unlock(&rule->dl_lock);
513 /* delay this message, update counters */
514 lnet_fault_stat_inc(&rule->dl_stat, type);
515 rule->dl_stat.u.delay.ls_delayed++;
517 list_add_tail(&msg->msg_list, &rule->dl_msg_list);
518 msg->msg_delay_send = round_timeout(
519 cfs_time_shift(attr->u.delay.la_latency));
520 if (rule->dl_msg_send == -1) {
521 rule->dl_msg_send = msg->msg_delay_send;
522 mod_timer(&rule->dl_timer, rule->dl_msg_send);
525 spin_unlock(&rule->dl_lock);
530 * check if \a msg can match any Delay Rule, receiving of this message
531 * will be delayed if there is a match.
534 lnet_delay_rule_match_locked(struct lnet_hdr *hdr, struct lnet_msg *msg)
536 struct lnet_delay_rule *rule;
537 lnet_nid_t src = le64_to_cpu(hdr->src_nid);
538 lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
539 unsigned int typ = le32_to_cpu(hdr->type);
540 unsigned int ptl = -1;
542 /* NB: called with hold of lnet_net_lock */
545 * NB: if Portal is specified, then only PUT and GET will be
546 * filtered by delay rule
548 if (typ == LNET_MSG_PUT)
549 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
550 else if (typ == LNET_MSG_GET)
551 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
553 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
554 if (delay_rule_match(rule, src, dst, typ, ptl, msg))
561 /** check out delayed messages for send */
563 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
564 struct list_head *msg_list)
566 struct lnet_msg *msg;
567 struct lnet_msg *tmp;
568 unsigned long now = cfs_time_current();
570 if (!all && rule->dl_msg_send > now)
573 spin_lock(&rule->dl_lock);
574 list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
575 if (!all && msg->msg_delay_send > now)
578 msg->msg_delay_send = 0;
579 list_move_tail(&msg->msg_list, msg_list);
582 if (list_empty(&rule->dl_msg_list)) {
583 del_timer(&rule->dl_timer);
584 rule->dl_msg_send = -1;
586 } else if (!list_empty(msg_list)) {
588 * dequeued some timedout messages, update timer for the
589 * next delayed message on rule
591 msg = list_entry(rule->dl_msg_list.next,
592 struct lnet_msg, msg_list);
593 rule->dl_msg_send = msg->msg_delay_send;
594 mod_timer(&rule->dl_timer, rule->dl_msg_send);
596 spin_unlock(&rule->dl_lock);
600 delayed_msg_process(struct list_head *msg_list, bool drop)
602 struct lnet_msg *msg;
604 while (!list_empty(msg_list)) {
609 msg = list_entry(msg_list->next, struct lnet_msg, msg_list);
610 LASSERT(msg->msg_rxpeer);
612 ni = msg->msg_rxpeer->lp_ni;
613 cpt = msg->msg_rx_cpt;
615 list_del_init(&msg->msg_list);
619 } else if (!msg->msg_routing) {
620 rc = lnet_parse_local(ni, msg);
626 rc = lnet_parse_forward_locked(ni, msg);
627 lnet_net_unlock(cpt);
631 lnet_ni_recv(ni, msg->msg_private, msg, 0,
632 0, msg->msg_len, msg->msg_len);
634 case LNET_CREDIT_WAIT:
636 default: /* failures */
641 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len);
642 lnet_finalize(ni, msg, rc);
647 * Process delayed messages for scheduled rules
648 * This function can either be called by delay_rule_daemon, or by lnet_finalise
651 lnet_delay_rule_check(void)
653 struct lnet_delay_rule *rule;
654 struct list_head msgs;
656 INIT_LIST_HEAD(&msgs);
658 if (list_empty(&delay_dd.dd_sched_rules))
661 spin_lock_bh(&delay_dd.dd_lock);
662 if (list_empty(&delay_dd.dd_sched_rules)) {
663 spin_unlock_bh(&delay_dd.dd_lock);
667 rule = list_entry(delay_dd.dd_sched_rules.next,
668 struct lnet_delay_rule, dl_sched_link);
669 list_del_init(&rule->dl_sched_link);
670 spin_unlock_bh(&delay_dd.dd_lock);
672 delayed_msg_check(rule, false, &msgs);
673 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
676 if (!list_empty(&msgs))
677 delayed_msg_process(&msgs, false);
680 /** daemon thread to handle delayed messages */
682 lnet_delay_rule_daemon(void *arg)
684 delay_dd.dd_running = 1;
685 wake_up(&delay_dd.dd_ctl_waitq);
687 while (delay_dd.dd_running) {
688 wait_event_interruptible(delay_dd.dd_waitq,
689 !delay_dd.dd_running ||
690 !list_empty(&delay_dd.dd_sched_rules));
691 lnet_delay_rule_check();
694 /* in case more rules have been enqueued after my last check */
695 lnet_delay_rule_check();
696 delay_dd.dd_stopped = 1;
697 wake_up(&delay_dd.dd_ctl_waitq);
703 delay_timer_cb(struct timer_list *t)
705 struct lnet_delay_rule *rule = from_timer(rule, t, dl_timer);
707 spin_lock_bh(&delay_dd.dd_lock);
708 if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
709 atomic_inc(&rule->dl_refcount);
710 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
711 wake_up(&delay_dd.dd_waitq);
713 spin_unlock_bh(&delay_dd.dd_lock);
717 * Add a new delay rule to LNet
718 * There is no check for duplicated delay rule, all rules will be checked for
722 lnet_delay_rule_add(struct lnet_fault_attr *attr)
724 struct lnet_delay_rule *rule;
727 if (attr->u.delay.la_rate & attr->u.delay.la_interval) {
728 CDEBUG(D_NET, "please provide either delay rate or delay interval, but not both at the same time %d/%d\n",
729 attr->u.delay.la_rate, attr->u.delay.la_interval);
733 if (!attr->u.delay.la_latency) {
734 CDEBUG(D_NET, "delay latency cannot be zero\n");
738 if (lnet_fault_attr_validate(attr))
745 mutex_lock(&delay_dd.dd_mutex);
746 if (!delay_dd.dd_running) {
747 struct task_struct *task;
750 * NB: although LND threads will process delayed message
751 * in lnet_finalize, but there is no guarantee that LND
752 * threads will be waken up if no other message needs to
754 * Only one daemon thread, performance is not the concern
755 * of this simualation module.
757 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
762 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
765 timer_setup(&rule->dl_timer, delay_timer_cb, 0);
767 spin_lock_init(&rule->dl_lock);
768 INIT_LIST_HEAD(&rule->dl_msg_list);
769 INIT_LIST_HEAD(&rule->dl_sched_link);
771 rule->dl_attr = *attr;
772 if (attr->u.delay.la_interval) {
773 rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
774 rule->dl_delay_time = cfs_time_shift(cfs_rand() %
775 attr->u.delay.la_interval);
777 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
780 rule->dl_msg_send = -1;
782 lnet_net_lock(LNET_LOCK_EX);
783 atomic_set(&rule->dl_refcount, 1);
784 list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
785 lnet_net_unlock(LNET_LOCK_EX);
787 CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
788 libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
789 attr->u.delay.la_rate);
791 mutex_unlock(&delay_dd.dd_mutex);
794 mutex_unlock(&delay_dd.dd_mutex);
800 * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
801 * and \a dst are zero, all rules will be removed, otherwise only matched rules
803 * If \a src is zero, then all rules have \a dst as destination will be remove
804 * If \a dst is zero, then all rules have \a src as source will be removed
806 * When a delay rule is removed, all delayed messages of this rule will be
807 * processed immediately.
810 lnet_delay_rule_del(lnet_nid_t src, lnet_nid_t dst, bool shutdown)
812 struct lnet_delay_rule *rule;
813 struct lnet_delay_rule *tmp;
814 struct list_head rule_list;
815 struct list_head msg_list;
819 INIT_LIST_HEAD(&rule_list);
820 INIT_LIST_HEAD(&msg_list);
827 mutex_lock(&delay_dd.dd_mutex);
828 lnet_net_lock(LNET_LOCK_EX);
830 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
831 if (rule->dl_attr.fa_src != src && src)
834 if (rule->dl_attr.fa_dst != dst && dst)
837 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
838 libcfs_nid2str(rule->dl_attr.fa_src),
839 libcfs_nid2str(rule->dl_attr.fa_dst),
840 rule->dl_attr.u.delay.la_rate,
841 rule->dl_attr.u.delay.la_interval);
842 /* refcount is taken over by rule_list */
843 list_move(&rule->dl_link, &rule_list);
846 /* check if we need to shutdown delay_daemon */
847 cleanup = list_empty(&the_lnet.ln_delay_rules) &&
848 !list_empty(&rule_list);
849 lnet_net_unlock(LNET_LOCK_EX);
851 list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
852 list_del_init(&rule->dl_link);
854 del_timer_sync(&rule->dl_timer);
855 delayed_msg_check(rule, true, &msg_list);
856 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
860 if (cleanup) { /* no more delay rule, shutdown delay_daemon */
861 LASSERT(delay_dd.dd_running);
862 delay_dd.dd_running = 0;
863 wake_up(&delay_dd.dd_waitq);
865 while (!delay_dd.dd_stopped)
866 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
868 mutex_unlock(&delay_dd.dd_mutex);
870 if (!list_empty(&msg_list))
871 delayed_msg_process(&msg_list, shutdown);
877 * List Delay Rule at position of \a pos
880 lnet_delay_rule_list(int pos, struct lnet_fault_attr *attr,
881 struct lnet_fault_stat *stat)
883 struct lnet_delay_rule *rule;
888 cpt = lnet_net_lock_current();
889 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
893 spin_lock(&rule->dl_lock);
894 *attr = rule->dl_attr;
895 *stat = rule->dl_stat;
896 spin_unlock(&rule->dl_lock);
901 lnet_net_unlock(cpt);
906 * reset counters for all Delay Rules
909 lnet_delay_rule_reset(void)
911 struct lnet_delay_rule *rule;
914 cpt = lnet_net_lock_current();
916 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
917 struct lnet_fault_attr *attr = &rule->dl_attr;
919 spin_lock(&rule->dl_lock);
921 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
922 if (attr->u.delay.la_rate) {
923 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
925 rule->dl_delay_time = cfs_time_shift(cfs_rand() %
926 attr->u.delay.la_interval);
927 rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
929 spin_unlock(&rule->dl_lock);
932 lnet_net_unlock(cpt);
936 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
938 struct lnet_fault_attr *attr;
939 struct lnet_fault_stat *stat;
941 attr = (struct lnet_fault_attr *)data->ioc_inlbuf1;
947 case LNET_CTL_DROP_ADD:
951 return lnet_drop_rule_add(attr);
953 case LNET_CTL_DROP_DEL:
957 data->ioc_count = lnet_drop_rule_del(attr->fa_src,
961 case LNET_CTL_DROP_RESET:
962 lnet_drop_rule_reset();
965 case LNET_CTL_DROP_LIST:
966 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
970 return lnet_drop_rule_list(data->ioc_count, attr, stat);
972 case LNET_CTL_DELAY_ADD:
976 return lnet_delay_rule_add(attr);
978 case LNET_CTL_DELAY_DEL:
982 data->ioc_count = lnet_delay_rule_del(attr->fa_src,
983 attr->fa_dst, false);
986 case LNET_CTL_DELAY_RESET:
987 lnet_delay_rule_reset();
990 case LNET_CTL_DELAY_LIST:
991 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
995 return lnet_delay_rule_list(data->ioc_count, attr, stat);
1000 lnet_fault_init(void)
1002 BUILD_BUG_ON(LNET_PUT_BIT != 1 << LNET_MSG_PUT);
1003 BUILD_BUG_ON(LNET_ACK_BIT != 1 << LNET_MSG_ACK);
1004 BUILD_BUG_ON(LNET_GET_BIT != 1 << LNET_MSG_GET);
1005 BUILD_BUG_ON(LNET_REPLY_BIT != 1 << LNET_MSG_REPLY);
1007 mutex_init(&delay_dd.dd_mutex);
1008 spin_lock_init(&delay_dd.dd_lock);
1009 init_waitqueue_head(&delay_dd.dd_waitq);
1010 init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1011 INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1017 lnet_fault_fini(void)
1019 lnet_drop_rule_del(0, 0);
1020 lnet_delay_rule_del(0, 0, true);
1022 LASSERT(list_empty(&the_lnet.ln_drop_rules));
1023 LASSERT(list_empty(&the_lnet.ln_delay_rules));
1024 LASSERT(list_empty(&delay_dd.dd_sched_rules));