cece3c7c69be399d39031ab80fcbe697a8b97b38
[sfrench/cifs-2.6.git] / arch / ia64 / sn / kernel / xpc_main.c
1 /*
2  * This file is subject to the terms and conditions of the GNU General Public
3  * License.  See the file "COPYING" in the main directory of this archive
4  * for more details.
5  *
6  * Copyright (c) 2004-2005 Silicon Graphics, Inc.  All Rights Reserved.
7  */
8
9
10 /*
11  * Cross Partition Communication (XPC) support - standard version.
12  *
13  *      XPC provides a message passing capability that crosses partition
14  *      boundaries. This module is made up of two parts:
15  *
16  *          partition   This part detects the presence/absence of other
17  *                      partitions. It provides a heartbeat and monitors
18  *                      the heartbeats of other partitions.
19  *
20  *          channel     This part manages the channels and sends/receives
21  *                      messages across them to/from other partitions.
22  *
23  *      There are a couple of additional functions residing in XP, which
24  *      provide an interface to XPC for its users.
25  *
26  *
27  *      Caveats:
28  *
29  *        . We currently have no way to determine which nasid an IPI came
30  *          from. Thus, xpc_IPI_send() does a remote AMO write followed by
31  *          an IPI. The AMO indicates where data is to be pulled from, so
32  *          after the IPI arrives, the remote partition checks the AMO word.
33  *          The IPI can actually arrive before the AMO however, so other code
34  *          must periodically check for this case. Also, remote AMO operations
35  *          do not reliably time out. Thus we do a remote PIO read solely to
36  *          know whether the remote partition is down and whether we should
37  *          stop sending IPIs to it. This remote PIO read operation is set up
38  *          in a special nofault region so SAL knows to ignore (and cleanup)
39  *          any errors due to the remote AMO write, PIO read, and/or PIO
40  *          write operations.
41  *
42  *          If/when new hardware solves this IPI problem, we should abandon
43  *          the current approach.
44  *
45  */
46
47
48 #include <linux/kernel.h>
49 #include <linux/module.h>
50 #include <linux/init.h>
51 #include <linux/sched.h>
52 #include <linux/syscalls.h>
53 #include <linux/cache.h>
54 #include <linux/interrupt.h>
55 #include <linux/slab.h>
56 #include <linux/delay.h>
57 #include <linux/reboot.h>
58 #include <asm/sn/intr.h>
59 #include <asm/sn/sn_sal.h>
60 #include <asm/uaccess.h>
61 #include "xpc.h"
62
63
64 /* define two XPC debug device structures to be used with dev_dbg() et al */
65
66 struct device_driver xpc_dbg_name = {
67         .name = "xpc"
68 };
69
70 struct device xpc_part_dbg_subname = {
71         .bus_id = {0},          /* set to "part" at xpc_init() time */
72         .driver = &xpc_dbg_name
73 };
74
75 struct device xpc_chan_dbg_subname = {
76         .bus_id = {0},          /* set to "chan" at xpc_init() time */
77         .driver = &xpc_dbg_name
78 };
79
80 struct device *xpc_part = &xpc_part_dbg_subname;
81 struct device *xpc_chan = &xpc_chan_dbg_subname;
82
83
84 /* systune related variables for /proc/sys directories */
85
86 static int xpc_hb_interval = XPC_HB_DEFAULT_INTERVAL;
87 static int xpc_hb_min_interval = 1;
88 static int xpc_hb_max_interval = 10;
89
90 static int xpc_hb_check_interval = XPC_HB_CHECK_DEFAULT_INTERVAL;
91 static int xpc_hb_check_min_interval = 10;
92 static int xpc_hb_check_max_interval = 120;
93
94 int xpc_disengage_request_timelimit = XPC_DISENGAGE_REQUEST_DEFAULT_TIMELIMIT;
95 static int xpc_disengage_request_min_timelimit = 0;
96 static int xpc_disengage_request_max_timelimit = 120;
97
98 static ctl_table xpc_sys_xpc_hb_dir[] = {
99         {
100                 1,
101                 "hb_interval",
102                 &xpc_hb_interval,
103                 sizeof(int),
104                 0644,
105                 NULL,
106                 &proc_dointvec_minmax,
107                 &sysctl_intvec,
108                 NULL,
109                 &xpc_hb_min_interval,
110                 &xpc_hb_max_interval
111         },
112         {
113                 2,
114                 "hb_check_interval",
115                 &xpc_hb_check_interval,
116                 sizeof(int),
117                 0644,
118                 NULL,
119                 &proc_dointvec_minmax,
120                 &sysctl_intvec,
121                 NULL,
122                 &xpc_hb_check_min_interval,
123                 &xpc_hb_check_max_interval
124         },
125         {0}
126 };
127 static ctl_table xpc_sys_xpc_dir[] = {
128         {
129                 1,
130                 "hb",
131                 NULL,
132                 0,
133                 0555,
134                 xpc_sys_xpc_hb_dir
135         },
136         {
137                 2,
138                 "disengage_request_timelimit",
139                 &xpc_disengage_request_timelimit,
140                 sizeof(int),
141                 0644,
142                 NULL,
143                 &proc_dointvec_minmax,
144                 &sysctl_intvec,
145                 NULL,
146                 &xpc_disengage_request_min_timelimit,
147                 &xpc_disengage_request_max_timelimit
148         },
149         {0}
150 };
151 static ctl_table xpc_sys_dir[] = {
152         {
153                 1,
154                 "xpc",
155                 NULL,
156                 0,
157                 0555,
158                 xpc_sys_xpc_dir
159         },
160         {0}
161 };
162 static struct ctl_table_header *xpc_sysctl;
163
164
165 /* #of IRQs received */
166 static atomic_t xpc_act_IRQ_rcvd;
167
168 /* IRQ handler notifies this wait queue on receipt of an IRQ */
169 static DECLARE_WAIT_QUEUE_HEAD(xpc_act_IRQ_wq);
170
171 static unsigned long xpc_hb_check_timeout;
172
173 /* notification that the xpc_hb_checker thread has exited */
174 static DECLARE_MUTEX_LOCKED(xpc_hb_checker_exited);
175
176 /* notification that the xpc_discovery thread has exited */
177 static DECLARE_MUTEX_LOCKED(xpc_discovery_exited);
178
179
180 static struct timer_list xpc_hb_timer;
181
182
183 static void xpc_kthread_waitmsgs(struct xpc_partition *, struct xpc_channel *);
184
185
186 static int xpc_system_reboot(struct notifier_block *, unsigned long, void *);
187 static struct notifier_block xpc_reboot_notifier = {
188         .notifier_call = xpc_system_reboot,
189 };
190
191
192 /*
193  * Timer function to enforce the timelimit on the partition disengage request.
194  */
195 static void
196 xpc_timeout_partition_disengage_request(unsigned long data)
197 {
198         struct xpc_partition *part = (struct xpc_partition *) data;
199
200
201         DBUG_ON(jiffies < part->disengage_request_timeout);
202
203         (void) xpc_partition_disengaged(part);
204
205         DBUG_ON(part->disengage_request_timeout != 0);
206         DBUG_ON(xpc_partition_engaged(1UL << XPC_PARTID(part)) != 0);
207 }
208
209
210 /*
211  * Notify the heartbeat check thread that an IRQ has been received.
212  */
213 static irqreturn_t
214 xpc_act_IRQ_handler(int irq, void *dev_id, struct pt_regs *regs)
215 {
216         atomic_inc(&xpc_act_IRQ_rcvd);
217         wake_up_interruptible(&xpc_act_IRQ_wq);
218         return IRQ_HANDLED;
219 }
220
221
222 /*
223  * Timer to produce the heartbeat.  The timer structures function is
224  * already set when this is initially called.  A tunable is used to
225  * specify when the next timeout should occur.
226  */
227 static void
228 xpc_hb_beater(unsigned long dummy)
229 {
230         xpc_vars->heartbeat++;
231
232         if (jiffies >= xpc_hb_check_timeout) {
233                 wake_up_interruptible(&xpc_act_IRQ_wq);
234         }
235
236         xpc_hb_timer.expires = jiffies + (xpc_hb_interval * HZ);
237         add_timer(&xpc_hb_timer);
238 }
239
240
241 /*
242  * This thread is responsible for nearly all of the partition
243  * activation/deactivation.
244  */
245 static int
246 xpc_hb_checker(void *ignore)
247 {
248         int last_IRQ_count = 0;
249         int new_IRQ_count;
250         int force_IRQ=0;
251
252
253         /* this thread was marked active by xpc_hb_init() */
254
255         daemonize(XPC_HB_CHECK_THREAD_NAME);
256
257         set_cpus_allowed(current, cpumask_of_cpu(XPC_HB_CHECK_CPU));
258
259         xpc_hb_check_timeout = jiffies + (xpc_hb_check_interval * HZ);
260
261         while (!(volatile int) xpc_exiting) {
262
263                 dev_dbg(xpc_part, "woke up with %d ticks rem; %d IRQs have "
264                         "been received\n",
265                         (int) (xpc_hb_check_timeout - jiffies),
266                         atomic_read(&xpc_act_IRQ_rcvd) - last_IRQ_count);
267
268
269                 /* checking of remote heartbeats is skewed by IRQ handling */
270                 if (jiffies >= xpc_hb_check_timeout) {
271                         dev_dbg(xpc_part, "checking remote heartbeats\n");
272                         xpc_check_remote_hb();
273
274                         /*
275                          * We need to periodically recheck to ensure no
276                          * IPI/AMO pairs have been missed.  That check
277                          * must always reset xpc_hb_check_timeout.
278                          */
279                         force_IRQ = 1;
280                 }
281
282
283                 /* check for outstanding IRQs */
284                 new_IRQ_count = atomic_read(&xpc_act_IRQ_rcvd);
285                 if (last_IRQ_count < new_IRQ_count || force_IRQ != 0) {
286                         force_IRQ = 0;
287
288                         dev_dbg(xpc_part, "found an IRQ to process; will be "
289                                 "resetting xpc_hb_check_timeout\n");
290
291                         last_IRQ_count += xpc_identify_act_IRQ_sender();
292                         if (last_IRQ_count < new_IRQ_count) {
293                                 /* retry once to help avoid missing AMO */
294                                 (void) xpc_identify_act_IRQ_sender();
295                         }
296                         last_IRQ_count = new_IRQ_count;
297
298                         xpc_hb_check_timeout = jiffies +
299                                            (xpc_hb_check_interval * HZ);
300                 }
301
302                 /* wait for IRQ or timeout */
303                 (void) wait_event_interruptible(xpc_act_IRQ_wq,
304                             (last_IRQ_count < atomic_read(&xpc_act_IRQ_rcvd) ||
305                                         jiffies >= xpc_hb_check_timeout ||
306                                                 (volatile int) xpc_exiting));
307         }
308
309         dev_dbg(xpc_part, "heartbeat checker is exiting\n");
310
311
312         /* mark this thread as having exited */
313         up(&xpc_hb_checker_exited);
314         return 0;
315 }
316
317
318 /*
319  * This thread will attempt to discover other partitions to activate
320  * based on info provided by SAL. This new thread is short lived and
321  * will exit once discovery is complete.
322  */
323 static int
324 xpc_initiate_discovery(void *ignore)
325 {
326         daemonize(XPC_DISCOVERY_THREAD_NAME);
327
328         xpc_discovery();
329
330         dev_dbg(xpc_part, "discovery thread is exiting\n");
331
332         /* mark this thread as having exited */
333         up(&xpc_discovery_exited);
334         return 0;
335 }
336
337
338 /*
339  * Establish first contact with the remote partititon. This involves pulling
340  * the XPC per partition variables from the remote partition and waiting for
341  * the remote partition to pull ours.
342  */
343 static enum xpc_retval
344 xpc_make_first_contact(struct xpc_partition *part)
345 {
346         enum xpc_retval ret;
347
348
349         while ((ret = xpc_pull_remote_vars_part(part)) != xpcSuccess) {
350                 if (ret != xpcRetry) {
351                         XPC_DEACTIVATE_PARTITION(part, ret);
352                         return ret;
353                 }
354
355                 dev_dbg(xpc_chan, "waiting to make first contact with "
356                         "partition %d\n", XPC_PARTID(part));
357
358                 /* wait a 1/4 of a second or so */
359                 (void) msleep_interruptible(250);
360
361                 if (part->act_state == XPC_P_DEACTIVATING) {
362                         return part->reason;
363                 }
364         }
365
366         return xpc_mark_partition_active(part);
367 }
368
369
370 /*
371  * The first kthread assigned to a newly activated partition is the one
372  * created by XPC HB with which it calls xpc_partition_up(). XPC hangs on to
373  * that kthread until the partition is brought down, at which time that kthread
374  * returns back to XPC HB. (The return of that kthread will signify to XPC HB
375  * that XPC has dismantled all communication infrastructure for the associated
376  * partition.) This kthread becomes the channel manager for that partition.
377  *
378  * Each active partition has a channel manager, who, besides connecting and
379  * disconnecting channels, will ensure that each of the partition's connected
380  * channels has the required number of assigned kthreads to get the work done.
381  */
382 static void
383 xpc_channel_mgr(struct xpc_partition *part)
384 {
385         while (part->act_state != XPC_P_DEACTIVATING ||
386                         atomic_read(&part->nchannels_active) > 0 ||
387                                         !xpc_partition_disengaged(part)) {
388
389                 xpc_process_channel_activity(part);
390
391
392                 /*
393                  * Wait until we've been requested to activate kthreads or
394                  * all of the channel's message queues have been torn down or
395                  * a signal is pending.
396                  *
397                  * The channel_mgr_requests is set to 1 after being awakened,
398                  * This is done to prevent the channel mgr from making one pass
399                  * through the loop for each request, since he will
400                  * be servicing all the requests in one pass. The reason it's
401                  * set to 1 instead of 0 is so that other kthreads will know
402                  * that the channel mgr is running and won't bother trying to
403                  * wake him up.
404                  */
405                 atomic_dec(&part->channel_mgr_requests);
406                 (void) wait_event_interruptible(part->channel_mgr_wq,
407                                 (atomic_read(&part->channel_mgr_requests) > 0 ||
408                                 (volatile u64) part->local_IPI_amo != 0 ||
409                                 ((volatile u8) part->act_state ==
410                                                         XPC_P_DEACTIVATING &&
411                                 atomic_read(&part->nchannels_active) == 0 &&
412                                 xpc_partition_disengaged(part))));
413                 atomic_set(&part->channel_mgr_requests, 1);
414
415                 // >>> Does it need to wakeup periodically as well? In case we
416                 // >>> miscalculated the #of kthreads to wakeup or create?
417         }
418 }
419
420
421 /*
422  * When XPC HB determines that a partition has come up, it will create a new
423  * kthread and that kthread will call this function to attempt to set up the
424  * basic infrastructure used for Cross Partition Communication with the newly
425  * upped partition.
426  *
427  * The kthread that was created by XPC HB and which setup the XPC
428  * infrastructure will remain assigned to the partition until the partition
429  * goes down. At which time the kthread will teardown the XPC infrastructure
430  * and then exit.
431  *
432  * XPC HB will put the remote partition's XPC per partition specific variables
433  * physical address into xpc_partitions[partid].remote_vars_part_pa prior to
434  * calling xpc_partition_up().
435  */
436 static void
437 xpc_partition_up(struct xpc_partition *part)
438 {
439         DBUG_ON(part->channels != NULL);
440
441         dev_dbg(xpc_chan, "activating partition %d\n", XPC_PARTID(part));
442
443         if (xpc_setup_infrastructure(part) != xpcSuccess) {
444                 return;
445         }
446
447         /*
448          * The kthread that XPC HB called us with will become the
449          * channel manager for this partition. It will not return
450          * back to XPC HB until the partition's XPC infrastructure
451          * has been dismantled.
452          */
453
454         (void) xpc_part_ref(part);      /* this will always succeed */
455
456         if (xpc_make_first_contact(part) == xpcSuccess) {
457                 xpc_channel_mgr(part);
458         }
459
460         xpc_part_deref(part);
461
462         xpc_teardown_infrastructure(part);
463 }
464
465
466 static int
467 xpc_activating(void *__partid)
468 {
469         partid_t partid = (u64) __partid;
470         struct xpc_partition *part = &xpc_partitions[partid];
471         unsigned long irq_flags;
472         struct sched_param param = { sched_priority: MAX_RT_PRIO - 1 };
473         int ret;
474
475
476         DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
477
478         spin_lock_irqsave(&part->act_lock, irq_flags);
479
480         if (part->act_state == XPC_P_DEACTIVATING) {
481                 part->act_state = XPC_P_INACTIVE;
482                 spin_unlock_irqrestore(&part->act_lock, irq_flags);
483                 part->remote_rp_pa = 0;
484                 return 0;
485         }
486
487         /* indicate the thread is activating */
488         DBUG_ON(part->act_state != XPC_P_ACTIVATION_REQ);
489         part->act_state = XPC_P_ACTIVATING;
490
491         XPC_SET_REASON(part, 0, 0);
492         spin_unlock_irqrestore(&part->act_lock, irq_flags);
493
494         dev_dbg(xpc_part, "bringing partition %d up\n", partid);
495
496         daemonize("xpc%02d", partid);
497
498         /*
499          * This thread needs to run at a realtime priority to prevent a
500          * significant performance degradation.
501          */
502         ret = sched_setscheduler(current, SCHED_FIFO, &param);
503         if (ret != 0) {
504                 dev_warn(xpc_part, "unable to set pid %d to a realtime "
505                         "priority, ret=%d\n", current->pid, ret);
506         }
507
508         /* allow this thread and its children to run on any CPU */
509         set_cpus_allowed(current, CPU_MASK_ALL);
510
511         /*
512          * Register the remote partition's AMOs with SAL so it can handle
513          * and cleanup errors within that address range should the remote
514          * partition go down. We don't unregister this range because it is
515          * difficult to tell when outstanding writes to the remote partition
516          * are finished and thus when it is safe to unregister. This should
517          * not result in wasted space in the SAL xp_addr_region table because
518          * we should get the same page for remote_amos_page_pa after module
519          * reloads and system reboots.
520          */
521         if (sn_register_xp_addr_region(part->remote_amos_page_pa,
522                                                         PAGE_SIZE, 1) < 0) {
523                 dev_warn(xpc_part, "xpc_partition_up(%d) failed to register "
524                         "xp_addr region\n", partid);
525
526                 spin_lock_irqsave(&part->act_lock, irq_flags);
527                 part->act_state = XPC_P_INACTIVE;
528                 XPC_SET_REASON(part, xpcPhysAddrRegFailed, __LINE__);
529                 spin_unlock_irqrestore(&part->act_lock, irq_flags);
530                 part->remote_rp_pa = 0;
531                 return 0;
532         }
533
534         xpc_allow_hb(partid, xpc_vars);
535         xpc_IPI_send_activated(part);
536
537
538         /*
539          * xpc_partition_up() holds this thread and marks this partition as
540          * XPC_P_ACTIVE by calling xpc_hb_mark_active().
541          */
542         (void) xpc_partition_up(part);
543
544         xpc_disallow_hb(partid, xpc_vars);
545         xpc_mark_partition_inactive(part);
546
547         if (part->reason == xpcReactivating) {
548                 /* interrupting ourselves results in activating partition */
549                 xpc_IPI_send_reactivate(part);
550         }
551
552         return 0;
553 }
554
555
556 void
557 xpc_activate_partition(struct xpc_partition *part)
558 {
559         partid_t partid = XPC_PARTID(part);
560         unsigned long irq_flags;
561         pid_t pid;
562
563
564         spin_lock_irqsave(&part->act_lock, irq_flags);
565
566         pid = kernel_thread(xpc_activating, (void *) ((u64) partid), 0);
567
568         DBUG_ON(part->act_state != XPC_P_INACTIVE);
569
570         if (pid > 0) {
571                 part->act_state = XPC_P_ACTIVATION_REQ;
572                 XPC_SET_REASON(part, xpcCloneKThread, __LINE__);
573         } else {
574                 XPC_SET_REASON(part, xpcCloneKThreadFailed, __LINE__);
575         }
576
577         spin_unlock_irqrestore(&part->act_lock, irq_flags);
578 }
579
580
581 /*
582  * Handle the receipt of a SGI_XPC_NOTIFY IRQ by seeing whether the specified
583  * partition actually sent it. Since SGI_XPC_NOTIFY IRQs may be shared by more
584  * than one partition, we use an AMO_t structure per partition to indicate
585  * whether a partition has sent an IPI or not.  >>> If it has, then wake up the
586  * associated kthread to handle it.
587  *
588  * All SGI_XPC_NOTIFY IRQs received by XPC are the result of IPIs sent by XPC
589  * running on other partitions.
590  *
591  * Noteworthy Arguments:
592  *
593  *      irq - Interrupt ReQuest number. NOT USED.
594  *
595  *      dev_id - partid of IPI's potential sender.
596  *
597  *      regs - processor's context before the processor entered
598  *             interrupt code. NOT USED.
599  */
600 irqreturn_t
601 xpc_notify_IRQ_handler(int irq, void *dev_id, struct pt_regs *regs)
602 {
603         partid_t partid = (partid_t) (u64) dev_id;
604         struct xpc_partition *part = &xpc_partitions[partid];
605
606
607         DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
608
609         if (xpc_part_ref(part)) {
610                 xpc_check_for_channel_activity(part);
611
612                 xpc_part_deref(part);
613         }
614         return IRQ_HANDLED;
615 }
616
617
618 /*
619  * Check to see if xpc_notify_IRQ_handler() dropped any IPIs on the floor
620  * because the write to their associated IPI amo completed after the IRQ/IPI
621  * was received.
622  */
623 void
624 xpc_dropped_IPI_check(struct xpc_partition *part)
625 {
626         if (xpc_part_ref(part)) {
627                 xpc_check_for_channel_activity(part);
628
629                 part->dropped_IPI_timer.expires = jiffies +
630                                                         XPC_P_DROPPED_IPI_WAIT;
631                 add_timer(&part->dropped_IPI_timer);
632                 xpc_part_deref(part);
633         }
634 }
635
636
637 void
638 xpc_activate_kthreads(struct xpc_channel *ch, int needed)
639 {
640         int idle = atomic_read(&ch->kthreads_idle);
641         int assigned = atomic_read(&ch->kthreads_assigned);
642         int wakeup;
643
644
645         DBUG_ON(needed <= 0);
646
647         if (idle > 0) {
648                 wakeup = (needed > idle) ? idle : needed;
649                 needed -= wakeup;
650
651                 dev_dbg(xpc_chan, "wakeup %d idle kthreads, partid=%d, "
652                         "channel=%d\n", wakeup, ch->partid, ch->number);
653
654                 /* only wakeup the requested number of kthreads */
655                 wake_up_nr(&ch->idle_wq, wakeup);
656         }
657
658         if (needed <= 0) {
659                 return;
660         }
661
662         if (needed + assigned > ch->kthreads_assigned_limit) {
663                 needed = ch->kthreads_assigned_limit - assigned;
664                 // >>>should never be less than 0
665                 if (needed <= 0) {
666                         return;
667                 }
668         }
669
670         dev_dbg(xpc_chan, "create %d new kthreads, partid=%d, channel=%d\n",
671                 needed, ch->partid, ch->number);
672
673         xpc_create_kthreads(ch, needed);
674 }
675
676
677 /*
678  * This function is where XPC's kthreads wait for messages to deliver.
679  */
680 static void
681 xpc_kthread_waitmsgs(struct xpc_partition *part, struct xpc_channel *ch)
682 {
683         do {
684                 /* deliver messages to their intended recipients */
685
686                 while ((volatile s64) ch->w_local_GP.get <
687                                 (volatile s64) ch->w_remote_GP.put &&
688                                         !((volatile u32) ch->flags &
689                                                 XPC_C_DISCONNECTING)) {
690                         xpc_deliver_msg(ch);
691                 }
692
693                 if (atomic_inc_return(&ch->kthreads_idle) >
694                                                 ch->kthreads_idle_limit) {
695                         /* too many idle kthreads on this channel */
696                         atomic_dec(&ch->kthreads_idle);
697                         break;
698                 }
699
700                 dev_dbg(xpc_chan, "idle kthread calling "
701                         "wait_event_interruptible_exclusive()\n");
702
703                 (void) wait_event_interruptible_exclusive(ch->idle_wq,
704                                 ((volatile s64) ch->w_local_GP.get <
705                                         (volatile s64) ch->w_remote_GP.put ||
706                                 ((volatile u32) ch->flags &
707                                                 XPC_C_DISCONNECTING)));
708
709                 atomic_dec(&ch->kthreads_idle);
710
711         } while (!((volatile u32) ch->flags & XPC_C_DISCONNECTING));
712 }
713
714
715 static int
716 xpc_daemonize_kthread(void *args)
717 {
718         partid_t partid = XPC_UNPACK_ARG1(args);
719         u16 ch_number = XPC_UNPACK_ARG2(args);
720         struct xpc_partition *part = &xpc_partitions[partid];
721         struct xpc_channel *ch;
722         int n_needed;
723         unsigned long irq_flags;
724
725
726         daemonize("xpc%02dc%d", partid, ch_number);
727
728         dev_dbg(xpc_chan, "kthread starting, partid=%d, channel=%d\n",
729                 partid, ch_number);
730
731         ch = &part->channels[ch_number];
732
733         if (!(ch->flags & XPC_C_DISCONNECTING)) {
734
735                 /* let registerer know that connection has been established */
736
737                 spin_lock_irqsave(&ch->lock, irq_flags);
738                 if (!(ch->flags & XPC_C_CONNECTCALLOUT)) {
739                         ch->flags |= XPC_C_CONNECTCALLOUT;
740                         spin_unlock_irqrestore(&ch->lock, irq_flags);
741
742                         xpc_connected_callout(ch);
743
744                         /*
745                          * It is possible that while the callout was being
746                          * made that the remote partition sent some messages.
747                          * If that is the case, we may need to activate
748                          * additional kthreads to help deliver them. We only
749                          * need one less than total #of messages to deliver.
750                          */
751                         n_needed = ch->w_remote_GP.put - ch->w_local_GP.get - 1;
752                         if (n_needed > 0 &&
753                                         !(ch->flags & XPC_C_DISCONNECTING)) {
754                                 xpc_activate_kthreads(ch, n_needed);
755                         }
756                 } else {
757                         spin_unlock_irqrestore(&ch->lock, irq_flags);
758                 }
759
760                 xpc_kthread_waitmsgs(part, ch);
761         }
762
763         if (atomic_dec_return(&ch->kthreads_assigned) == 0) {
764                 spin_lock_irqsave(&ch->lock, irq_flags);
765                 if ((ch->flags & XPC_C_CONNECTCALLOUT) &&
766                                 !(ch->flags & XPC_C_DISCONNECTCALLOUT)) {
767                         ch->flags |= XPC_C_DISCONNECTCALLOUT;
768                         spin_unlock_irqrestore(&ch->lock, irq_flags);
769
770                         xpc_disconnecting_callout(ch);
771                 } else {
772                         spin_unlock_irqrestore(&ch->lock, irq_flags);
773                 }
774                 if (atomic_dec_return(&part->nchannels_engaged) == 0) {
775                         xpc_mark_partition_disengaged(part);
776                         xpc_IPI_send_disengage(part);
777                 }
778         }
779
780
781         xpc_msgqueue_deref(ch);
782
783         dev_dbg(xpc_chan, "kthread exiting, partid=%d, channel=%d\n",
784                 partid, ch_number);
785
786         xpc_part_deref(part);
787         return 0;
788 }
789
790
791 /*
792  * For each partition that XPC has established communications with, there is
793  * a minimum of one kernel thread assigned to perform any operation that
794  * may potentially sleep or block (basically the callouts to the asynchronous
795  * functions registered via xpc_connect()).
796  *
797  * Additional kthreads are created and destroyed by XPC as the workload
798  * demands.
799  *
800  * A kthread is assigned to one of the active channels that exists for a given
801  * partition.
802  */
803 void
804 xpc_create_kthreads(struct xpc_channel *ch, int needed)
805 {
806         unsigned long irq_flags;
807         pid_t pid;
808         u64 args = XPC_PACK_ARGS(ch->partid, ch->number);
809         struct xpc_partition *part = &xpc_partitions[ch->partid];
810
811
812         while (needed-- > 0) {
813
814                 /*
815                  * The following is done on behalf of the newly created
816                  * kthread. That kthread is responsible for doing the
817                  * counterpart to the following before it exits.
818                  */
819                 (void) xpc_part_ref(part);
820                 xpc_msgqueue_ref(ch);
821                 if (atomic_inc_return(&ch->kthreads_assigned) == 1 &&
822                     atomic_inc_return(&part->nchannels_engaged) == 1) {
823                         xpc_mark_partition_engaged(part);
824                 }
825
826                 pid = kernel_thread(xpc_daemonize_kthread, (void *) args, 0);
827                 if (pid < 0) {
828                         /* the fork failed */
829                         if (atomic_dec_return(&ch->kthreads_assigned) == 0 &&
830                             atomic_dec_return(&part->nchannels_engaged) == 0) {
831                                 xpc_mark_partition_disengaged(part);
832                                 xpc_IPI_send_disengage(part);
833                         }
834                         xpc_msgqueue_deref(ch);
835                         xpc_part_deref(part);
836
837                         if (atomic_read(&ch->kthreads_assigned) <
838                                                 ch->kthreads_idle_limit) {
839                                 /*
840                                  * Flag this as an error only if we have an
841                                  * insufficient #of kthreads for the channel
842                                  * to function.
843                                  *
844                                  * No xpc_msgqueue_ref() is needed here since
845                                  * the channel mgr is doing this.
846                                  */
847                                 spin_lock_irqsave(&ch->lock, irq_flags);
848                                 XPC_DISCONNECT_CHANNEL(ch, xpcLackOfResources,
849                                                                 &irq_flags);
850                                 spin_unlock_irqrestore(&ch->lock, irq_flags);
851                         }
852                         break;
853                 }
854
855                 ch->kthreads_created++; // >>> temporary debug only!!!
856         }
857 }
858
859
860 void
861 xpc_disconnect_wait(int ch_number)
862 {
863         unsigned long irq_flags;
864         partid_t partid;
865         struct xpc_partition *part;
866         struct xpc_channel *ch;
867         int wakeup_channel_mgr;
868
869
870         /* now wait for all callouts to the caller's function to cease */
871         for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) {
872                 part = &xpc_partitions[partid];
873
874                 if (!xpc_part_ref(part)) {
875                         continue;
876                 }
877
878                 ch = &part->channels[ch_number];
879
880                 if (!(ch->flags & XPC_C_WDISCONNECT)) {
881                         xpc_part_deref(part);
882                         continue;
883                 }
884
885                 (void) down(&ch->wdisconnect_sema);
886
887                 spin_lock_irqsave(&ch->lock, irq_flags);
888                 DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED));
889                 wakeup_channel_mgr = 0;
890
891                 if (ch->delayed_IPI_flags) {
892                         if (part->act_state != XPC_P_DEACTIVATING) {
893                                 spin_lock(&part->IPI_lock);
894                                 XPC_SET_IPI_FLAGS(part->local_IPI_amo,
895                                         ch->number, ch->delayed_IPI_flags);
896                                 spin_unlock(&part->IPI_lock);
897                                 wakeup_channel_mgr = 1;
898                         }
899                         ch->delayed_IPI_flags = 0;
900                 }
901
902                 ch->flags &= ~XPC_C_WDISCONNECT;
903                 spin_unlock_irqrestore(&ch->lock, irq_flags);
904
905                 if (wakeup_channel_mgr) {
906                         xpc_wakeup_channel_mgr(part);
907                 }
908
909                 xpc_part_deref(part);
910         }
911 }
912
913
914 static void
915 xpc_do_exit(enum xpc_retval reason)
916 {
917         partid_t partid;
918         int active_part_count;
919         struct xpc_partition *part;
920         unsigned long printmsg_time;
921
922
923         /* a 'rmmod XPC' and a 'reboot' cannot both end up here together */
924         DBUG_ON(xpc_exiting == 1);
925
926         /*
927          * Let the heartbeat checker thread and the discovery thread
928          * (if one is running) know that they should exit. Also wake up
929          * the heartbeat checker thread in case it's sleeping.
930          */
931         xpc_exiting = 1;
932         wake_up_interruptible(&xpc_act_IRQ_wq);
933
934         /* ignore all incoming interrupts */
935         free_irq(SGI_XPC_ACTIVATE, NULL);
936
937         /* wait for the discovery thread to exit */
938         down(&xpc_discovery_exited);
939
940         /* wait for the heartbeat checker thread to exit */
941         down(&xpc_hb_checker_exited);
942
943
944         /* sleep for a 1/3 of a second or so */
945         (void) msleep_interruptible(300);
946
947
948         /* wait for all partitions to become inactive */
949
950         printmsg_time = jiffies;
951
952         do {
953                 active_part_count = 0;
954
955                 for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) {
956                         part = &xpc_partitions[partid];
957
958                         if (xpc_partition_disengaged(part) &&
959                                         part->act_state == XPC_P_INACTIVE) {
960                                 continue;
961                         }
962
963                         active_part_count++;
964
965                         XPC_DEACTIVATE_PARTITION(part, reason);
966                 }
967
968                 if (active_part_count == 0) {
969                         break;
970                 }
971
972                 if (jiffies >= printmsg_time) {
973                         dev_info(xpc_part, "waiting for partitions to "
974                                 "deactivate/disengage, active count=%d, remote "
975                                 "engaged=0x%lx\n", active_part_count,
976                                 xpc_partition_engaged(1UL << partid));
977
978                         printmsg_time = jiffies +
979                                         (XPC_DISENGAGE_PRINTMSG_INTERVAL * HZ);
980                 }
981
982                 /* sleep for a 1/3 of a second or so */
983                 (void) msleep_interruptible(300);
984
985         } while (1);
986
987         DBUG_ON(xpc_partition_engaged(-1UL));
988
989
990         /* indicate to others that our reserved page is uninitialized */
991         xpc_rsvd_page->vars_pa = 0;
992
993         /* now it's time to eliminate our heartbeat */
994         del_timer_sync(&xpc_hb_timer);
995         DBUG_ON(xpc_vars->heartbeating_to_mask != 0);
996
997         /* take ourselves off of the reboot_notifier_list */
998         (void) unregister_reboot_notifier(&xpc_reboot_notifier);
999
1000         /* close down protections for IPI operations */
1001         xpc_restrict_IPI_ops();
1002
1003
1004         /* clear the interface to XPC's functions */
1005         xpc_clear_interface();
1006
1007         if (xpc_sysctl) {
1008                 unregister_sysctl_table(xpc_sysctl);
1009         }
1010 }
1011
1012
1013 /*
1014  * This function is called when the system is being rebooted.
1015  */
1016 static int
1017 xpc_system_reboot(struct notifier_block *nb, unsigned long event, void *unused)
1018 {
1019         enum xpc_retval reason;
1020
1021
1022         switch (event) {
1023         case SYS_RESTART:
1024                 reason = xpcSystemReboot;
1025                 break;
1026         case SYS_HALT:
1027                 reason = xpcSystemHalt;
1028                 break;
1029         case SYS_POWER_OFF:
1030                 reason = xpcSystemPoweroff;
1031                 break;
1032         default:
1033                 reason = xpcSystemGoingDown;
1034         }
1035
1036         xpc_do_exit(reason);
1037         return NOTIFY_DONE;
1038 }
1039
1040
1041 int __init
1042 xpc_init(void)
1043 {
1044         int ret;
1045         partid_t partid;
1046         struct xpc_partition *part;
1047         pid_t pid;
1048
1049
1050         if (!ia64_platform_is("sn2")) {
1051                 return -ENODEV;
1052         }
1053
1054         /*
1055          * xpc_remote_copy_buffer is used as a temporary buffer for bte_copy'ng
1056          * various portions of a partition's reserved page. Its size is based
1057          * on the size of the reserved page header and part_nasids mask. So we
1058          * need to ensure that the other items will fit as well.
1059          */
1060         if (XPC_RP_VARS_SIZE > XPC_RP_HEADER_SIZE + XP_NASID_MASK_BYTES) {
1061                 dev_err(xpc_part, "xpc_remote_copy_buffer is not big enough\n");
1062                 return -EPERM;
1063         }
1064         DBUG_ON((u64) xpc_remote_copy_buffer !=
1065                                 L1_CACHE_ALIGN((u64) xpc_remote_copy_buffer));
1066
1067         snprintf(xpc_part->bus_id, BUS_ID_SIZE, "part");
1068         snprintf(xpc_chan->bus_id, BUS_ID_SIZE, "chan");
1069
1070         xpc_sysctl = register_sysctl_table(xpc_sys_dir, 1);
1071
1072         /*
1073          * The first few fields of each entry of xpc_partitions[] need to
1074          * be initialized now so that calls to xpc_connect() and
1075          * xpc_disconnect() can be made prior to the activation of any remote
1076          * partition. NOTE THAT NONE OF THE OTHER FIELDS BELONGING TO THESE
1077          * ENTRIES ARE MEANINGFUL UNTIL AFTER AN ENTRY'S CORRESPONDING
1078          * PARTITION HAS BEEN ACTIVATED.
1079          */
1080         for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) {
1081                 part = &xpc_partitions[partid];
1082
1083                 DBUG_ON((u64) part != L1_CACHE_ALIGN((u64) part));
1084
1085                 part->act_IRQ_rcvd = 0;
1086                 spin_lock_init(&part->act_lock);
1087                 part->act_state = XPC_P_INACTIVE;
1088                 XPC_SET_REASON(part, 0, 0);
1089
1090                 init_timer(&part->disengage_request_timer);
1091                 part->disengage_request_timer.function =
1092                                 xpc_timeout_partition_disengage_request;
1093                 part->disengage_request_timer.data = (unsigned long) part;
1094
1095                 part->setup_state = XPC_P_UNSET;
1096                 init_waitqueue_head(&part->teardown_wq);
1097                 atomic_set(&part->references, 0);
1098         }
1099
1100         /*
1101          * Open up protections for IPI operations (and AMO operations on
1102          * Shub 1.1 systems).
1103          */
1104         xpc_allow_IPI_ops();
1105
1106         /*
1107          * Interrupts being processed will increment this atomic variable and
1108          * awaken the heartbeat thread which will process the interrupts.
1109          */
1110         atomic_set(&xpc_act_IRQ_rcvd, 0);
1111
1112         /*
1113          * This is safe to do before the xpc_hb_checker thread has started
1114          * because the handler releases a wait queue.  If an interrupt is
1115          * received before the thread is waiting, it will not go to sleep,
1116          * but rather immediately process the interrupt.
1117          */
1118         ret = request_irq(SGI_XPC_ACTIVATE, xpc_act_IRQ_handler, 0,
1119                                                         "xpc hb", NULL);
1120         if (ret != 0) {
1121                 dev_err(xpc_part, "can't register ACTIVATE IRQ handler, "
1122                         "errno=%d\n", -ret);
1123
1124                 xpc_restrict_IPI_ops();
1125
1126                 if (xpc_sysctl) {
1127                         unregister_sysctl_table(xpc_sysctl);
1128                 }
1129                 return -EBUSY;
1130         }
1131
1132         /*
1133          * Fill the partition reserved page with the information needed by
1134          * other partitions to discover we are alive and establish initial
1135          * communications.
1136          */
1137         xpc_rsvd_page = xpc_rsvd_page_init();
1138         if (xpc_rsvd_page == NULL) {
1139                 dev_err(xpc_part, "could not setup our reserved page\n");
1140
1141                 free_irq(SGI_XPC_ACTIVATE, NULL);
1142                 xpc_restrict_IPI_ops();
1143
1144                 if (xpc_sysctl) {
1145                         unregister_sysctl_table(xpc_sysctl);
1146                 }
1147                 return -EBUSY;
1148         }
1149
1150
1151         /* add ourselves to the reboot_notifier_list */
1152         ret = register_reboot_notifier(&xpc_reboot_notifier);
1153         if (ret != 0) {
1154                 dev_warn(xpc_part, "can't register reboot notifier\n");
1155         }
1156
1157
1158         /*
1159          * Set the beating to other partitions into motion.  This is
1160          * the last requirement for other partitions' discovery to
1161          * initiate communications with us.
1162          */
1163         init_timer(&xpc_hb_timer);
1164         xpc_hb_timer.function = xpc_hb_beater;
1165         xpc_hb_beater(0);
1166
1167
1168         /*
1169          * The real work-horse behind xpc.  This processes incoming
1170          * interrupts and monitors remote heartbeats.
1171          */
1172         pid = kernel_thread(xpc_hb_checker, NULL, 0);
1173         if (pid < 0) {
1174                 dev_err(xpc_part, "failed while forking hb check thread\n");
1175
1176                 /* indicate to others that our reserved page is uninitialized */
1177                 xpc_rsvd_page->vars_pa = 0;
1178
1179                 /* take ourselves off of the reboot_notifier_list */
1180                 (void) unregister_reboot_notifier(&xpc_reboot_notifier);
1181
1182                 del_timer_sync(&xpc_hb_timer);
1183                 free_irq(SGI_XPC_ACTIVATE, NULL);
1184                 xpc_restrict_IPI_ops();
1185
1186                 if (xpc_sysctl) {
1187                         unregister_sysctl_table(xpc_sysctl);
1188                 }
1189                 return -EBUSY;
1190         }
1191
1192
1193         /*
1194          * Startup a thread that will attempt to discover other partitions to
1195          * activate based on info provided by SAL. This new thread is short
1196          * lived and will exit once discovery is complete.
1197          */
1198         pid = kernel_thread(xpc_initiate_discovery, NULL, 0);
1199         if (pid < 0) {
1200                 dev_err(xpc_part, "failed while forking discovery thread\n");
1201
1202                 /* mark this new thread as a non-starter */
1203                 up(&xpc_discovery_exited);
1204
1205                 xpc_do_exit(xpcUnloading);
1206                 return -EBUSY;
1207         }
1208
1209
1210         /* set the interface to point at XPC's functions */
1211         xpc_set_interface(xpc_initiate_connect, xpc_initiate_disconnect,
1212                           xpc_initiate_allocate, xpc_initiate_send,
1213                           xpc_initiate_send_notify, xpc_initiate_received,
1214                           xpc_initiate_partid_to_nasids);
1215
1216         return 0;
1217 }
1218 module_init(xpc_init);
1219
1220
1221 void __exit
1222 xpc_exit(void)
1223 {
1224         xpc_do_exit(xpcUnloading);
1225 }
1226 module_exit(xpc_exit);
1227
1228
1229 MODULE_AUTHOR("Silicon Graphics, Inc.");
1230 MODULE_DESCRIPTION("Cross Partition Communication (XPC) support");
1231 MODULE_LICENSE("GPL");
1232
1233 module_param(xpc_hb_interval, int, 0);
1234 MODULE_PARM_DESC(xpc_hb_interval, "Number of seconds between "
1235                 "heartbeat increments.");
1236
1237 module_param(xpc_hb_check_interval, int, 0);
1238 MODULE_PARM_DESC(xpc_hb_check_interval, "Number of seconds between "
1239                 "heartbeat checks.");
1240
1241 module_param(xpc_disengage_request_timelimit, int, 0);
1242 MODULE_PARM_DESC(xpc_disengage_request_timelimit, "Number of seconds to wait "
1243                 "for disengage request to complete.");
1244