Revert "V4L/DVB (3543): Fix Makefile to adapt to bt8xx/ conversion"
[sfrench/cifs-2.6.git] / arch / ia64 / sn / kernel / xpc_channel.c
1 /*
2  * This file is subject to the terms and conditions of the GNU General Public
3  * License.  See the file "COPYING" in the main directory of this archive
4  * for more details.
5  *
6  * Copyright (c) 2004-2006 Silicon Graphics, Inc.  All Rights Reserved.
7  */
8
9
10 /*
11  * Cross Partition Communication (XPC) channel support.
12  *
13  *      This is the part of XPC that manages the channels and
14  *      sends/receives messages across them to/from other partitions.
15  *
16  */
17
18
19 #include <linux/kernel.h>
20 #include <linux/init.h>
21 #include <linux/sched.h>
22 #include <linux/cache.h>
23 #include <linux/interrupt.h>
24 #include <linux/slab.h>
25 #include <linux/mutex.h>
26 #include <linux/completion.h>
27 #include <asm/sn/bte.h>
28 #include <asm/sn/sn_sal.h>
29 #include <asm/sn/xpc.h>
30
31
32 /*
33  * Set up the initial values for the XPartition Communication channels.
34  */
35 static void
36 xpc_initialize_channels(struct xpc_partition *part, partid_t partid)
37 {
38         int ch_number;
39         struct xpc_channel *ch;
40
41
42         for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
43                 ch = &part->channels[ch_number];
44
45                 ch->partid = partid;
46                 ch->number = ch_number;
47                 ch->flags = XPC_C_DISCONNECTED;
48
49                 ch->local_GP = &part->local_GPs[ch_number];
50                 ch->local_openclose_args =
51                                         &part->local_openclose_args[ch_number];
52
53                 atomic_set(&ch->kthreads_assigned, 0);
54                 atomic_set(&ch->kthreads_idle, 0);
55                 atomic_set(&ch->kthreads_active, 0);
56
57                 atomic_set(&ch->references, 0);
58                 atomic_set(&ch->n_to_notify, 0);
59
60                 spin_lock_init(&ch->lock);
61                 mutex_init(&ch->msg_to_pull_mutex);
62                 init_completion(&ch->wdisconnect_wait);
63
64                 atomic_set(&ch->n_on_msg_allocate_wq, 0);
65                 init_waitqueue_head(&ch->msg_allocate_wq);
66                 init_waitqueue_head(&ch->idle_wq);
67         }
68 }
69
70
71 /*
72  * Setup the infrastructure necessary to support XPartition Communication
73  * between the specified remote partition and the local one.
74  */
75 enum xpc_retval
76 xpc_setup_infrastructure(struct xpc_partition *part)
77 {
78         int ret, cpuid;
79         struct timer_list *timer;
80         partid_t partid = XPC_PARTID(part);
81
82
83         /*
84          * Zero out MOST of the entry for this partition. Only the fields
85          * starting with `nchannels' will be zeroed. The preceding fields must
86          * remain `viable' across partition ups and downs, since they may be
87          * referenced during this memset() operation.
88          */
89         memset(&part->nchannels, 0, sizeof(struct xpc_partition) -
90                                 offsetof(struct xpc_partition, nchannels));
91
92         /*
93          * Allocate all of the channel structures as a contiguous chunk of
94          * memory.
95          */
96         part->channels = kmalloc(sizeof(struct xpc_channel) * XPC_NCHANNELS,
97                                                                 GFP_KERNEL);
98         if (part->channels == NULL) {
99                 dev_err(xpc_chan, "can't get memory for channels\n");
100                 return xpcNoMemory;
101         }
102         memset(part->channels, 0, sizeof(struct xpc_channel) * XPC_NCHANNELS);
103
104         part->nchannels = XPC_NCHANNELS;
105
106
107         /* allocate all the required GET/PUT values */
108
109         part->local_GPs = xpc_kmalloc_cacheline_aligned(XPC_GP_SIZE,
110                                         GFP_KERNEL, &part->local_GPs_base);
111         if (part->local_GPs == NULL) {
112                 kfree(part->channels);
113                 part->channels = NULL;
114                 dev_err(xpc_chan, "can't get memory for local get/put "
115                         "values\n");
116                 return xpcNoMemory;
117         }
118         memset(part->local_GPs, 0, XPC_GP_SIZE);
119
120         part->remote_GPs = xpc_kmalloc_cacheline_aligned(XPC_GP_SIZE,
121                                         GFP_KERNEL, &part->remote_GPs_base);
122         if (part->remote_GPs == NULL) {
123                 kfree(part->channels);
124                 part->channels = NULL;
125                 kfree(part->local_GPs_base);
126                 part->local_GPs = NULL;
127                 dev_err(xpc_chan, "can't get memory for remote get/put "
128                         "values\n");
129                 return xpcNoMemory;
130         }
131         memset(part->remote_GPs, 0, XPC_GP_SIZE);
132
133
134         /* allocate all the required open and close args */
135
136         part->local_openclose_args = xpc_kmalloc_cacheline_aligned(
137                                         XPC_OPENCLOSE_ARGS_SIZE, GFP_KERNEL,
138                                         &part->local_openclose_args_base);
139         if (part->local_openclose_args == NULL) {
140                 kfree(part->channels);
141                 part->channels = NULL;
142                 kfree(part->local_GPs_base);
143                 part->local_GPs = NULL;
144                 kfree(part->remote_GPs_base);
145                 part->remote_GPs = NULL;
146                 dev_err(xpc_chan, "can't get memory for local connect args\n");
147                 return xpcNoMemory;
148         }
149         memset(part->local_openclose_args, 0, XPC_OPENCLOSE_ARGS_SIZE);
150
151         part->remote_openclose_args = xpc_kmalloc_cacheline_aligned(
152                                         XPC_OPENCLOSE_ARGS_SIZE, GFP_KERNEL,
153                                         &part->remote_openclose_args_base);
154         if (part->remote_openclose_args == NULL) {
155                 kfree(part->channels);
156                 part->channels = NULL;
157                 kfree(part->local_GPs_base);
158                 part->local_GPs = NULL;
159                 kfree(part->remote_GPs_base);
160                 part->remote_GPs = NULL;
161                 kfree(part->local_openclose_args_base);
162                 part->local_openclose_args = NULL;
163                 dev_err(xpc_chan, "can't get memory for remote connect args\n");
164                 return xpcNoMemory;
165         }
166         memset(part->remote_openclose_args, 0, XPC_OPENCLOSE_ARGS_SIZE);
167
168
169         xpc_initialize_channels(part, partid);
170
171         atomic_set(&part->nchannels_active, 0);
172         atomic_set(&part->nchannels_engaged, 0);
173
174
175         /* local_IPI_amo were set to 0 by an earlier memset() */
176
177         /* Initialize this partitions AMO_t structure */
178         part->local_IPI_amo_va = xpc_IPI_init(partid);
179
180         spin_lock_init(&part->IPI_lock);
181
182         atomic_set(&part->channel_mgr_requests, 1);
183         init_waitqueue_head(&part->channel_mgr_wq);
184
185         sprintf(part->IPI_owner, "xpc%02d", partid);
186         ret = request_irq(SGI_XPC_NOTIFY, xpc_notify_IRQ_handler, SA_SHIRQ,
187                                 part->IPI_owner, (void *) (u64) partid);
188         if (ret != 0) {
189                 kfree(part->channels);
190                 part->channels = NULL;
191                 kfree(part->local_GPs_base);
192                 part->local_GPs = NULL;
193                 kfree(part->remote_GPs_base);
194                 part->remote_GPs = NULL;
195                 kfree(part->local_openclose_args_base);
196                 part->local_openclose_args = NULL;
197                 kfree(part->remote_openclose_args_base);
198                 part->remote_openclose_args = NULL;
199                 dev_err(xpc_chan, "can't register NOTIFY IRQ handler, "
200                         "errno=%d\n", -ret);
201                 return xpcLackOfResources;
202         }
203
204         /* Setup a timer to check for dropped IPIs */
205         timer = &part->dropped_IPI_timer;
206         init_timer(timer);
207         timer->function = (void (*)(unsigned long)) xpc_dropped_IPI_check;
208         timer->data = (unsigned long) part;
209         timer->expires = jiffies + XPC_P_DROPPED_IPI_WAIT;
210         add_timer(timer);
211
212         /*
213          * With the setting of the partition setup_state to XPC_P_SETUP, we're
214          * declaring that this partition is ready to go.
215          */
216         part->setup_state = XPC_P_SETUP;
217
218
219         /*
220          * Setup the per partition specific variables required by the
221          * remote partition to establish channel connections with us.
222          *
223          * The setting of the magic # indicates that these per partition
224          * specific variables are ready to be used.
225          */
226         xpc_vars_part[partid].GPs_pa = __pa(part->local_GPs);
227         xpc_vars_part[partid].openclose_args_pa =
228                                         __pa(part->local_openclose_args);
229         xpc_vars_part[partid].IPI_amo_pa = __pa(part->local_IPI_amo_va);
230         cpuid = raw_smp_processor_id(); /* any CPU in this partition will do */
231         xpc_vars_part[partid].IPI_nasid = cpuid_to_nasid(cpuid);
232         xpc_vars_part[partid].IPI_phys_cpuid = cpu_physical_id(cpuid);
233         xpc_vars_part[partid].nchannels = part->nchannels;
234         xpc_vars_part[partid].magic = XPC_VP_MAGIC1;
235
236         return xpcSuccess;
237 }
238
239
240 /*
241  * Create a wrapper that hides the underlying mechanism for pulling a cacheline
242  * (or multiple cachelines) from a remote partition.
243  *
244  * src must be a cacheline aligned physical address on the remote partition.
245  * dst must be a cacheline aligned virtual address on this partition.
246  * cnt must be an cacheline sized
247  */
248 static enum xpc_retval
249 xpc_pull_remote_cachelines(struct xpc_partition *part, void *dst,
250                                 const void *src, size_t cnt)
251 {
252         bte_result_t bte_ret;
253
254
255         DBUG_ON((u64) src != L1_CACHE_ALIGN((u64) src));
256         DBUG_ON((u64) dst != L1_CACHE_ALIGN((u64) dst));
257         DBUG_ON(cnt != L1_CACHE_ALIGN(cnt));
258
259         if (part->act_state == XPC_P_DEACTIVATING) {
260                 return part->reason;
261         }
262
263         bte_ret = xp_bte_copy((u64) src, (u64) ia64_tpa((u64) dst),
264                                 (u64) cnt, (BTE_NORMAL | BTE_WACQUIRE), NULL);
265         if (bte_ret == BTE_SUCCESS) {
266                 return xpcSuccess;
267         }
268
269         dev_dbg(xpc_chan, "xp_bte_copy() from partition %d failed, ret=%d\n",
270                 XPC_PARTID(part), bte_ret);
271
272         return xpc_map_bte_errors(bte_ret);
273 }
274
275
276 /*
277  * Pull the remote per partititon specific variables from the specified
278  * partition.
279  */
280 enum xpc_retval
281 xpc_pull_remote_vars_part(struct xpc_partition *part)
282 {
283         u8 buffer[L1_CACHE_BYTES * 2];
284         struct xpc_vars_part *pulled_entry_cacheline =
285                         (struct xpc_vars_part *) L1_CACHE_ALIGN((u64) buffer);
286         struct xpc_vars_part *pulled_entry;
287         u64 remote_entry_cacheline_pa, remote_entry_pa;
288         partid_t partid = XPC_PARTID(part);
289         enum xpc_retval ret;
290
291
292         /* pull the cacheline that contains the variables we're interested in */
293
294         DBUG_ON(part->remote_vars_part_pa !=
295                                 L1_CACHE_ALIGN(part->remote_vars_part_pa));
296         DBUG_ON(sizeof(struct xpc_vars_part) != L1_CACHE_BYTES / 2);
297
298         remote_entry_pa = part->remote_vars_part_pa +
299                         sn_partition_id * sizeof(struct xpc_vars_part);
300
301         remote_entry_cacheline_pa = (remote_entry_pa & ~(L1_CACHE_BYTES - 1));
302
303         pulled_entry = (struct xpc_vars_part *) ((u64) pulled_entry_cacheline +
304                                 (remote_entry_pa & (L1_CACHE_BYTES - 1)));
305
306         ret = xpc_pull_remote_cachelines(part, pulled_entry_cacheline,
307                                         (void *) remote_entry_cacheline_pa,
308                                         L1_CACHE_BYTES);
309         if (ret != xpcSuccess) {
310                 dev_dbg(xpc_chan, "failed to pull XPC vars_part from "
311                         "partition %d, ret=%d\n", partid, ret);
312                 return ret;
313         }
314
315
316         /* see if they've been set up yet */
317
318         if (pulled_entry->magic != XPC_VP_MAGIC1 &&
319                                 pulled_entry->magic != XPC_VP_MAGIC2) {
320
321                 if (pulled_entry->magic != 0) {
322                         dev_dbg(xpc_chan, "partition %d's XPC vars_part for "
323                                 "partition %d has bad magic value (=0x%lx)\n",
324                                 partid, sn_partition_id, pulled_entry->magic);
325                         return xpcBadMagic;
326                 }
327
328                 /* they've not been initialized yet */
329                 return xpcRetry;
330         }
331
332         if (xpc_vars_part[partid].magic == XPC_VP_MAGIC1) {
333
334                 /* validate the variables */
335
336                 if (pulled_entry->GPs_pa == 0 ||
337                                 pulled_entry->openclose_args_pa == 0 ||
338                                         pulled_entry->IPI_amo_pa == 0) {
339
340                         dev_err(xpc_chan, "partition %d's XPC vars_part for "
341                                 "partition %d are not valid\n", partid,
342                                 sn_partition_id);
343                         return xpcInvalidAddress;
344                 }
345
346                 /* the variables we imported look to be valid */
347
348                 part->remote_GPs_pa = pulled_entry->GPs_pa;
349                 part->remote_openclose_args_pa =
350                                         pulled_entry->openclose_args_pa;
351                 part->remote_IPI_amo_va =
352                                       (AMO_t *) __va(pulled_entry->IPI_amo_pa);
353                 part->remote_IPI_nasid = pulled_entry->IPI_nasid;
354                 part->remote_IPI_phys_cpuid = pulled_entry->IPI_phys_cpuid;
355
356                 if (part->nchannels > pulled_entry->nchannels) {
357                         part->nchannels = pulled_entry->nchannels;
358                 }
359
360                 /* let the other side know that we've pulled their variables */
361
362                 xpc_vars_part[partid].magic = XPC_VP_MAGIC2;
363         }
364
365         if (pulled_entry->magic == XPC_VP_MAGIC1) {
366                 return xpcRetry;
367         }
368
369         return xpcSuccess;
370 }
371
372
373 /*
374  * Get the IPI flags and pull the openclose args and/or remote GPs as needed.
375  */
376 static u64
377 xpc_get_IPI_flags(struct xpc_partition *part)
378 {
379         unsigned long irq_flags;
380         u64 IPI_amo;
381         enum xpc_retval ret;
382
383
384         /*
385          * See if there are any IPI flags to be handled.
386          */
387
388         spin_lock_irqsave(&part->IPI_lock, irq_flags);
389         if ((IPI_amo = part->local_IPI_amo) != 0) {
390                 part->local_IPI_amo = 0;
391         }
392         spin_unlock_irqrestore(&part->IPI_lock, irq_flags);
393
394
395         if (XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(IPI_amo)) {
396                 ret = xpc_pull_remote_cachelines(part,
397                                         part->remote_openclose_args,
398                                         (void *) part->remote_openclose_args_pa,
399                                         XPC_OPENCLOSE_ARGS_SIZE);
400                 if (ret != xpcSuccess) {
401                         XPC_DEACTIVATE_PARTITION(part, ret);
402
403                         dev_dbg(xpc_chan, "failed to pull openclose args from "
404                                 "partition %d, ret=%d\n", XPC_PARTID(part),
405                                 ret);
406
407                         /* don't bother processing IPIs anymore */
408                         IPI_amo = 0;
409                 }
410         }
411
412         if (XPC_ANY_MSG_IPI_FLAGS_SET(IPI_amo)) {
413                 ret = xpc_pull_remote_cachelines(part, part->remote_GPs,
414                                                 (void *) part->remote_GPs_pa,
415                                                 XPC_GP_SIZE);
416                 if (ret != xpcSuccess) {
417                         XPC_DEACTIVATE_PARTITION(part, ret);
418
419                         dev_dbg(xpc_chan, "failed to pull GPs from partition "
420                                 "%d, ret=%d\n", XPC_PARTID(part), ret);
421
422                         /* don't bother processing IPIs anymore */
423                         IPI_amo = 0;
424                 }
425         }
426
427         return IPI_amo;
428 }
429
430
431 /*
432  * Allocate the local message queue and the notify queue.
433  */
434 static enum xpc_retval
435 xpc_allocate_local_msgqueue(struct xpc_channel *ch)
436 {
437         unsigned long irq_flags;
438         int nentries;
439         size_t nbytes;
440
441
442         // >>> may want to check for ch->flags & XPC_C_DISCONNECTING between
443         // >>> iterations of the for-loop, bail if set?
444
445         // >>> should we impose a minumum #of entries? like 4 or 8?
446         for (nentries = ch->local_nentries; nentries > 0; nentries--) {
447
448                 nbytes = nentries * ch->msg_size;
449                 ch->local_msgqueue = xpc_kmalloc_cacheline_aligned(nbytes,
450                                                 GFP_KERNEL,
451                                                 &ch->local_msgqueue_base);
452                 if (ch->local_msgqueue == NULL) {
453                         continue;
454                 }
455                 memset(ch->local_msgqueue, 0, nbytes);
456
457                 nbytes = nentries * sizeof(struct xpc_notify);
458                 ch->notify_queue = kmalloc(nbytes, GFP_KERNEL);
459                 if (ch->notify_queue == NULL) {
460                         kfree(ch->local_msgqueue_base);
461                         ch->local_msgqueue = NULL;
462                         continue;
463                 }
464                 memset(ch->notify_queue, 0, nbytes);
465
466                 spin_lock_irqsave(&ch->lock, irq_flags);
467                 if (nentries < ch->local_nentries) {
468                         dev_dbg(xpc_chan, "nentries=%d local_nentries=%d, "
469                                 "partid=%d, channel=%d\n", nentries,
470                                 ch->local_nentries, ch->partid, ch->number);
471
472                         ch->local_nentries = nentries;
473                 }
474                 spin_unlock_irqrestore(&ch->lock, irq_flags);
475                 return xpcSuccess;
476         }
477
478         dev_dbg(xpc_chan, "can't get memory for local message queue and notify "
479                 "queue, partid=%d, channel=%d\n", ch->partid, ch->number);
480         return xpcNoMemory;
481 }
482
483
484 /*
485  * Allocate the cached remote message queue.
486  */
487 static enum xpc_retval
488 xpc_allocate_remote_msgqueue(struct xpc_channel *ch)
489 {
490         unsigned long irq_flags;
491         int nentries;
492         size_t nbytes;
493
494
495         DBUG_ON(ch->remote_nentries <= 0);
496
497         // >>> may want to check for ch->flags & XPC_C_DISCONNECTING between
498         // >>> iterations of the for-loop, bail if set?
499
500         // >>> should we impose a minumum #of entries? like 4 or 8?
501         for (nentries = ch->remote_nentries; nentries > 0; nentries--) {
502
503                 nbytes = nentries * ch->msg_size;
504                 ch->remote_msgqueue = xpc_kmalloc_cacheline_aligned(nbytes,
505                                                 GFP_KERNEL,
506                                                 &ch->remote_msgqueue_base);
507                 if (ch->remote_msgqueue == NULL) {
508                         continue;
509                 }
510                 memset(ch->remote_msgqueue, 0, nbytes);
511
512                 spin_lock_irqsave(&ch->lock, irq_flags);
513                 if (nentries < ch->remote_nentries) {
514                         dev_dbg(xpc_chan, "nentries=%d remote_nentries=%d, "
515                                 "partid=%d, channel=%d\n", nentries,
516                                 ch->remote_nentries, ch->partid, ch->number);
517
518                         ch->remote_nentries = nentries;
519                 }
520                 spin_unlock_irqrestore(&ch->lock, irq_flags);
521                 return xpcSuccess;
522         }
523
524         dev_dbg(xpc_chan, "can't get memory for cached remote message queue, "
525                 "partid=%d, channel=%d\n", ch->partid, ch->number);
526         return xpcNoMemory;
527 }
528
529
530 /*
531  * Allocate message queues and other stuff associated with a channel.
532  *
533  * Note: Assumes all of the channel sizes are filled in.
534  */
535 static enum xpc_retval
536 xpc_allocate_msgqueues(struct xpc_channel *ch)
537 {
538         unsigned long irq_flags;
539         enum xpc_retval ret;
540
541
542         DBUG_ON(ch->flags & XPC_C_SETUP);
543
544         if ((ret = xpc_allocate_local_msgqueue(ch)) != xpcSuccess) {
545                 return ret;
546         }
547
548         if ((ret = xpc_allocate_remote_msgqueue(ch)) != xpcSuccess) {
549                 kfree(ch->local_msgqueue_base);
550                 ch->local_msgqueue = NULL;
551                 kfree(ch->notify_queue);
552                 ch->notify_queue = NULL;
553                 return ret;
554         }
555
556         spin_lock_irqsave(&ch->lock, irq_flags);
557         ch->flags |= XPC_C_SETUP;
558         spin_unlock_irqrestore(&ch->lock, irq_flags);
559
560         return xpcSuccess;
561 }
562
563
564 /*
565  * Process a connect message from a remote partition.
566  *
567  * Note: xpc_process_connect() is expecting to be called with the
568  * spin_lock_irqsave held and will leave it locked upon return.
569  */
570 static void
571 xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags)
572 {
573         enum xpc_retval ret;
574
575
576         DBUG_ON(!spin_is_locked(&ch->lock));
577
578         if (!(ch->flags & XPC_C_OPENREQUEST) ||
579                                 !(ch->flags & XPC_C_ROPENREQUEST)) {
580                 /* nothing more to do for now */
581                 return;
582         }
583         DBUG_ON(!(ch->flags & XPC_C_CONNECTING));
584
585         if (!(ch->flags & XPC_C_SETUP)) {
586                 spin_unlock_irqrestore(&ch->lock, *irq_flags);
587                 ret = xpc_allocate_msgqueues(ch);
588                 spin_lock_irqsave(&ch->lock, *irq_flags);
589
590                 if (ret != xpcSuccess) {
591                         XPC_DISCONNECT_CHANNEL(ch, ret, irq_flags);
592                 }
593                 if (ch->flags & (XPC_C_CONNECTED | XPC_C_DISCONNECTING)) {
594                         return;
595                 }
596
597                 DBUG_ON(!(ch->flags & XPC_C_SETUP));
598                 DBUG_ON(ch->local_msgqueue == NULL);
599                 DBUG_ON(ch->remote_msgqueue == NULL);
600         }
601
602         if (!(ch->flags & XPC_C_OPENREPLY)) {
603                 ch->flags |= XPC_C_OPENREPLY;
604                 xpc_IPI_send_openreply(ch, irq_flags);
605         }
606
607         if (!(ch->flags & XPC_C_ROPENREPLY)) {
608                 return;
609         }
610
611         DBUG_ON(ch->remote_msgqueue_pa == 0);
612
613         ch->flags = (XPC_C_CONNECTED | XPC_C_SETUP);    /* clear all else */
614
615         dev_info(xpc_chan, "channel %d to partition %d connected\n",
616                 ch->number, ch->partid);
617
618         spin_unlock_irqrestore(&ch->lock, *irq_flags);
619         xpc_create_kthreads(ch, 1);
620         spin_lock_irqsave(&ch->lock, *irq_flags);
621 }
622
623
624 /*
625  * Notify those who wanted to be notified upon delivery of their message.
626  */
627 static void
628 xpc_notify_senders(struct xpc_channel *ch, enum xpc_retval reason, s64 put)
629 {
630         struct xpc_notify *notify;
631         u8 notify_type;
632         s64 get = ch->w_remote_GP.get - 1;
633
634
635         while (++get < put && atomic_read(&ch->n_to_notify) > 0) {
636
637                 notify = &ch->notify_queue[get % ch->local_nentries];
638
639                 /*
640                  * See if the notify entry indicates it was associated with
641                  * a message who's sender wants to be notified. It is possible
642                  * that it is, but someone else is doing or has done the
643                  * notification.
644                  */
645                 notify_type = notify->type;
646                 if (notify_type == 0 ||
647                                 cmpxchg(&notify->type, notify_type, 0) !=
648                                                                 notify_type) {
649                         continue;
650                 }
651
652                 DBUG_ON(notify_type != XPC_N_CALL);
653
654                 atomic_dec(&ch->n_to_notify);
655
656                 if (notify->func != NULL) {
657                         dev_dbg(xpc_chan, "notify->func() called, notify=0x%p, "
658                                 "msg_number=%ld, partid=%d, channel=%d\n",
659                                 (void *) notify, get, ch->partid, ch->number);
660
661                         notify->func(reason, ch->partid, ch->number,
662                                                                 notify->key);
663
664                         dev_dbg(xpc_chan, "notify->func() returned, "
665                                 "notify=0x%p, msg_number=%ld, partid=%d, "
666                                 "channel=%d\n", (void *) notify, get,
667                                 ch->partid, ch->number);
668                 }
669         }
670 }
671
672
673 /*
674  * Free up message queues and other stuff that were allocated for the specified
675  * channel.
676  *
677  * Note: ch->reason and ch->reason_line are left set for debugging purposes,
678  * they're cleared when XPC_C_DISCONNECTED is cleared.
679  */
680 static void
681 xpc_free_msgqueues(struct xpc_channel *ch)
682 {
683         DBUG_ON(!spin_is_locked(&ch->lock));
684         DBUG_ON(atomic_read(&ch->n_to_notify) != 0);
685
686         ch->remote_msgqueue_pa = 0;
687         ch->func = NULL;
688         ch->key = NULL;
689         ch->msg_size = 0;
690         ch->local_nentries = 0;
691         ch->remote_nentries = 0;
692         ch->kthreads_assigned_limit = 0;
693         ch->kthreads_idle_limit = 0;
694
695         ch->local_GP->get = 0;
696         ch->local_GP->put = 0;
697         ch->remote_GP.get = 0;
698         ch->remote_GP.put = 0;
699         ch->w_local_GP.get = 0;
700         ch->w_local_GP.put = 0;
701         ch->w_remote_GP.get = 0;
702         ch->w_remote_GP.put = 0;
703         ch->next_msg_to_pull = 0;
704
705         if (ch->flags & XPC_C_SETUP) {
706                 ch->flags &= ~XPC_C_SETUP;
707
708                 dev_dbg(xpc_chan, "ch->flags=0x%x, partid=%d, channel=%d\n",
709                         ch->flags, ch->partid, ch->number);
710
711                 kfree(ch->local_msgqueue_base);
712                 ch->local_msgqueue = NULL;
713                 kfree(ch->remote_msgqueue_base);
714                 ch->remote_msgqueue = NULL;
715                 kfree(ch->notify_queue);
716                 ch->notify_queue = NULL;
717         }
718 }
719
720
721 /*
722  * spin_lock_irqsave() is expected to be held on entry.
723  */
724 static void
725 xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
726 {
727         struct xpc_partition *part = &xpc_partitions[ch->partid];
728         u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED);
729
730
731         DBUG_ON(!spin_is_locked(&ch->lock));
732
733         if (!(ch->flags & XPC_C_DISCONNECTING)) {
734                 return;
735         }
736
737         DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
738
739         /* make sure all activity has settled down first */
740
741         if (atomic_read(&ch->references) > 0 ||
742                         ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
743                         !(ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE))) {
744                 return;
745         }
746         DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0);
747
748         if (part->act_state == XPC_P_DEACTIVATING) {
749                 /* can't proceed until the other side disengages from us */
750                 if (xpc_partition_engaged(1UL << ch->partid)) {
751                         return;
752                 }
753
754         } else {
755
756                 /* as long as the other side is up do the full protocol */
757
758                 if (!(ch->flags & XPC_C_RCLOSEREQUEST)) {
759                         return;
760                 }
761
762                 if (!(ch->flags & XPC_C_CLOSEREPLY)) {
763                         ch->flags |= XPC_C_CLOSEREPLY;
764                         xpc_IPI_send_closereply(ch, irq_flags);
765                 }
766
767                 if (!(ch->flags & XPC_C_RCLOSEREPLY)) {
768                         return;
769                 }
770         }
771
772         /* wake those waiting for notify completion */
773         if (atomic_read(&ch->n_to_notify) > 0) {
774                 /* >>> we do callout while holding ch->lock */
775                 xpc_notify_senders(ch, ch->reason, ch->w_local_GP.put);
776         }
777
778         /* both sides are disconnected now */
779
780         if (ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE) {
781                 spin_unlock_irqrestore(&ch->lock, *irq_flags);
782                 xpc_disconnect_callout(ch, xpcDisconnected);
783                 spin_lock_irqsave(&ch->lock, *irq_flags);
784         }
785
786         /* it's now safe to free the channel's message queues */
787         xpc_free_msgqueues(ch);
788
789         /* mark disconnected, clear all other flags except XPC_C_WDISCONNECT */
790         ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT));
791
792         atomic_dec(&part->nchannels_active);
793
794         if (channel_was_connected) {
795                 dev_info(xpc_chan, "channel %d to partition %d disconnected, "
796                         "reason=%d\n", ch->number, ch->partid, ch->reason);
797         }
798
799         if (ch->flags & XPC_C_WDISCONNECT) {
800                 /* we won't lose the CPU since we're holding ch->lock */
801                 complete(&ch->wdisconnect_wait);
802         } else if (ch->delayed_IPI_flags) {
803                 if (part->act_state != XPC_P_DEACTIVATING) {
804                         /* time to take action on any delayed IPI flags */
805                         spin_lock(&part->IPI_lock);
806                         XPC_SET_IPI_FLAGS(part->local_IPI_amo, ch->number,
807                                                         ch->delayed_IPI_flags);
808                         spin_unlock(&part->IPI_lock);
809                 }
810                 ch->delayed_IPI_flags = 0;
811         }
812 }
813
814
815 /*
816  * Process a change in the channel's remote connection state.
817  */
818 static void
819 xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number,
820                                 u8 IPI_flags)
821 {
822         unsigned long irq_flags;
823         struct xpc_openclose_args *args =
824                                 &part->remote_openclose_args[ch_number];
825         struct xpc_channel *ch = &part->channels[ch_number];
826         enum xpc_retval reason;
827
828
829
830         spin_lock_irqsave(&ch->lock, irq_flags);
831
832 again:
833
834         if ((ch->flags & XPC_C_DISCONNECTED) &&
835                                         (ch->flags & XPC_C_WDISCONNECT)) {
836                 /*
837                  * Delay processing IPI flags until thread waiting disconnect
838                  * has had a chance to see that the channel is disconnected.
839                  */
840                 ch->delayed_IPI_flags |= IPI_flags;
841                 spin_unlock_irqrestore(&ch->lock, irq_flags);
842                 return;
843         }
844
845
846         if (IPI_flags & XPC_IPI_CLOSEREQUEST) {
847
848                 dev_dbg(xpc_chan, "XPC_IPI_CLOSEREQUEST (reason=%d) received "
849                         "from partid=%d, channel=%d\n", args->reason,
850                         ch->partid, ch->number);
851
852                 /*
853                  * If RCLOSEREQUEST is set, we're probably waiting for
854                  * RCLOSEREPLY. We should find it and a ROPENREQUEST packed
855                  * with this RCLOSEREQUEST in the IPI_flags.
856                  */
857
858                 if (ch->flags & XPC_C_RCLOSEREQUEST) {
859                         DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING));
860                         DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
861                         DBUG_ON(!(ch->flags & XPC_C_CLOSEREPLY));
862                         DBUG_ON(ch->flags & XPC_C_RCLOSEREPLY);
863
864                         DBUG_ON(!(IPI_flags & XPC_IPI_CLOSEREPLY));
865                         IPI_flags &= ~XPC_IPI_CLOSEREPLY;
866                         ch->flags |= XPC_C_RCLOSEREPLY;
867
868                         /* both sides have finished disconnecting */
869                         xpc_process_disconnect(ch, &irq_flags);
870                         DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED));
871                         goto again;
872                 }
873
874                 if (ch->flags & XPC_C_DISCONNECTED) {
875                         if (!(IPI_flags & XPC_IPI_OPENREQUEST)) {
876                                 if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo,
877                                          ch_number) & XPC_IPI_OPENREQUEST)) {
878
879                                         DBUG_ON(ch->delayed_IPI_flags != 0);
880                                         spin_lock(&part->IPI_lock);
881                                         XPC_SET_IPI_FLAGS(part->local_IPI_amo,
882                                                         ch_number,
883                                                         XPC_IPI_CLOSEREQUEST);
884                                         spin_unlock(&part->IPI_lock);
885                                 }
886                                 spin_unlock_irqrestore(&ch->lock, irq_flags);
887                                 return;
888                         }
889
890                         XPC_SET_REASON(ch, 0, 0);
891                         ch->flags &= ~XPC_C_DISCONNECTED;
892
893                         atomic_inc(&part->nchannels_active);
894                         ch->flags |= (XPC_C_CONNECTING | XPC_C_ROPENREQUEST);
895                 }
896
897                 IPI_flags &= ~(XPC_IPI_OPENREQUEST | XPC_IPI_OPENREPLY);
898
899                 /*
900                  * The meaningful CLOSEREQUEST connection state fields are:
901                  *      reason = reason connection is to be closed
902                  */
903
904                 ch->flags |= XPC_C_RCLOSEREQUEST;
905
906                 if (!(ch->flags & XPC_C_DISCONNECTING)) {
907                         reason = args->reason;
908                         if (reason <= xpcSuccess || reason > xpcUnknownReason) {
909                                 reason = xpcUnknownReason;
910                         } else if (reason == xpcUnregistering) {
911                                 reason = xpcOtherUnregistering;
912                         }
913
914                         XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
915
916                         DBUG_ON(IPI_flags & XPC_IPI_CLOSEREPLY);
917                         spin_unlock_irqrestore(&ch->lock, irq_flags);
918                         return;
919                 }
920
921                 xpc_process_disconnect(ch, &irq_flags);
922         }
923
924
925         if (IPI_flags & XPC_IPI_CLOSEREPLY) {
926
927                 dev_dbg(xpc_chan, "XPC_IPI_CLOSEREPLY received from partid=%d,"
928                         " channel=%d\n", ch->partid, ch->number);
929
930                 if (ch->flags & XPC_C_DISCONNECTED) {
931                         DBUG_ON(part->act_state != XPC_P_DEACTIVATING);
932                         spin_unlock_irqrestore(&ch->lock, irq_flags);
933                         return;
934                 }
935
936                 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
937
938                 if (!(ch->flags & XPC_C_RCLOSEREQUEST)) {
939                         if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, ch_number)
940                                                 & XPC_IPI_CLOSEREQUEST)) {
941
942                                 DBUG_ON(ch->delayed_IPI_flags != 0);
943                                 spin_lock(&part->IPI_lock);
944                                 XPC_SET_IPI_FLAGS(part->local_IPI_amo,
945                                                 ch_number, XPC_IPI_CLOSEREPLY);
946                                 spin_unlock(&part->IPI_lock);
947                         }
948                         spin_unlock_irqrestore(&ch->lock, irq_flags);
949                         return;
950                 }
951
952                 ch->flags |= XPC_C_RCLOSEREPLY;
953
954                 if (ch->flags & XPC_C_CLOSEREPLY) {
955                         /* both sides have finished disconnecting */
956                         xpc_process_disconnect(ch, &irq_flags);
957                 }
958         }
959
960
961         if (IPI_flags & XPC_IPI_OPENREQUEST) {
962
963                 dev_dbg(xpc_chan, "XPC_IPI_OPENREQUEST (msg_size=%d, "
964                         "local_nentries=%d) received from partid=%d, "
965                         "channel=%d\n", args->msg_size, args->local_nentries,
966                         ch->partid, ch->number);
967
968                 if (part->act_state == XPC_P_DEACTIVATING ||
969                                         (ch->flags & XPC_C_ROPENREQUEST)) {
970                         spin_unlock_irqrestore(&ch->lock, irq_flags);
971                         return;
972                 }
973
974                 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) {
975                         ch->delayed_IPI_flags |= XPC_IPI_OPENREQUEST;
976                         spin_unlock_irqrestore(&ch->lock, irq_flags);
977                         return;
978                 }
979                 DBUG_ON(!(ch->flags & (XPC_C_DISCONNECTED |
980                                                         XPC_C_OPENREQUEST)));
981                 DBUG_ON(ch->flags & (XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
982                                         XPC_C_OPENREPLY | XPC_C_CONNECTED));
983
984                 /*
985                  * The meaningful OPENREQUEST connection state fields are:
986                  *      msg_size = size of channel's messages in bytes
987                  *      local_nentries = remote partition's local_nentries
988                  */
989                 if (args->msg_size == 0 || args->local_nentries == 0) {
990                         /* assume OPENREQUEST was delayed by mistake */
991                         spin_unlock_irqrestore(&ch->lock, irq_flags);
992                         return;
993                 }
994
995                 ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING);
996                 ch->remote_nentries = args->local_nentries;
997
998
999                 if (ch->flags & XPC_C_OPENREQUEST) {
1000                         if (args->msg_size != ch->msg_size) {
1001                                 XPC_DISCONNECT_CHANNEL(ch, xpcUnequalMsgSizes,
1002                                                                 &irq_flags);
1003                                 spin_unlock_irqrestore(&ch->lock, irq_flags);
1004                                 return;
1005                         }
1006                 } else {
1007                         ch->msg_size = args->msg_size;
1008
1009                         XPC_SET_REASON(ch, 0, 0);
1010                         ch->flags &= ~XPC_C_DISCONNECTED;
1011
1012                         atomic_inc(&part->nchannels_active);
1013                 }
1014
1015                 xpc_process_connect(ch, &irq_flags);
1016         }
1017
1018
1019         if (IPI_flags & XPC_IPI_OPENREPLY) {
1020
1021                 dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY (local_msgqueue_pa=0x%lx, "
1022                         "local_nentries=%d, remote_nentries=%d) received from "
1023                         "partid=%d, channel=%d\n", args->local_msgqueue_pa,
1024                         args->local_nentries, args->remote_nentries,
1025                         ch->partid, ch->number);
1026
1027                 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) {
1028                         spin_unlock_irqrestore(&ch->lock, irq_flags);
1029                         return;
1030                 }
1031                 if (!(ch->flags & XPC_C_OPENREQUEST)) {
1032                         XPC_DISCONNECT_CHANNEL(ch, xpcOpenCloseError,
1033                                                                 &irq_flags);
1034                         spin_unlock_irqrestore(&ch->lock, irq_flags);
1035                         return;
1036                 }
1037
1038                 DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST));
1039                 DBUG_ON(ch->flags & XPC_C_CONNECTED);
1040
1041                 /*
1042                  * The meaningful OPENREPLY connection state fields are:
1043                  *      local_msgqueue_pa = physical address of remote
1044                  *                          partition's local_msgqueue
1045                  *      local_nentries = remote partition's local_nentries
1046                  *      remote_nentries = remote partition's remote_nentries
1047                  */
1048                 DBUG_ON(args->local_msgqueue_pa == 0);
1049                 DBUG_ON(args->local_nentries == 0);
1050                 DBUG_ON(args->remote_nentries == 0);
1051
1052                 ch->flags |= XPC_C_ROPENREPLY;
1053                 ch->remote_msgqueue_pa = args->local_msgqueue_pa;
1054
1055                 if (args->local_nentries < ch->remote_nentries) {
1056                         dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new "
1057                                 "remote_nentries=%d, old remote_nentries=%d, "
1058                                 "partid=%d, channel=%d\n",
1059                                 args->local_nentries, ch->remote_nentries,
1060                                 ch->partid, ch->number);
1061
1062                         ch->remote_nentries = args->local_nentries;
1063                 }
1064                 if (args->remote_nentries < ch->local_nentries) {
1065                         dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new "
1066                                 "local_nentries=%d, old local_nentries=%d, "
1067                                 "partid=%d, channel=%d\n",
1068                                 args->remote_nentries, ch->local_nentries,
1069                                 ch->partid, ch->number);
1070
1071                         ch->local_nentries = args->remote_nentries;
1072                 }
1073
1074                 xpc_process_connect(ch, &irq_flags);
1075         }
1076
1077         spin_unlock_irqrestore(&ch->lock, irq_flags);
1078 }
1079
1080
1081 /*
1082  * Attempt to establish a channel connection to a remote partition.
1083  */
1084 static enum xpc_retval
1085 xpc_connect_channel(struct xpc_channel *ch)
1086 {
1087         unsigned long irq_flags;
1088         struct xpc_registration *registration = &xpc_registrations[ch->number];
1089
1090
1091         if (mutex_trylock(&registration->mutex) == 0) {
1092                 return xpcRetry;
1093         }
1094
1095         if (!XPC_CHANNEL_REGISTERED(ch->number)) {
1096                 mutex_unlock(&registration->mutex);
1097                 return xpcUnregistered;
1098         }
1099
1100         spin_lock_irqsave(&ch->lock, irq_flags);
1101
1102         DBUG_ON(ch->flags & XPC_C_CONNECTED);
1103         DBUG_ON(ch->flags & XPC_C_OPENREQUEST);
1104
1105         if (ch->flags & XPC_C_DISCONNECTING) {
1106                 spin_unlock_irqrestore(&ch->lock, irq_flags);
1107                 mutex_unlock(&registration->mutex);
1108                 return ch->reason;
1109         }
1110
1111
1112         /* add info from the channel connect registration to the channel */
1113
1114         ch->kthreads_assigned_limit = registration->assigned_limit;
1115         ch->kthreads_idle_limit = registration->idle_limit;
1116         DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0);
1117         DBUG_ON(atomic_read(&ch->kthreads_idle) != 0);
1118         DBUG_ON(atomic_read(&ch->kthreads_active) != 0);
1119
1120         ch->func = registration->func;
1121         DBUG_ON(registration->func == NULL);
1122         ch->key = registration->key;
1123
1124         ch->local_nentries = registration->nentries;
1125
1126         if (ch->flags & XPC_C_ROPENREQUEST) {
1127                 if (registration->msg_size != ch->msg_size) {
1128                         /* the local and remote sides aren't the same */
1129
1130                         /*
1131                          * Because XPC_DISCONNECT_CHANNEL() can block we're
1132                          * forced to up the registration sema before we unlock
1133                          * the channel lock. But that's okay here because we're
1134                          * done with the part that required the registration
1135                          * sema. XPC_DISCONNECT_CHANNEL() requires that the
1136                          * channel lock be locked and will unlock and relock
1137                          * the channel lock as needed.
1138                          */
1139                         mutex_unlock(&registration->mutex);
1140                         XPC_DISCONNECT_CHANNEL(ch, xpcUnequalMsgSizes,
1141                                                                 &irq_flags);
1142                         spin_unlock_irqrestore(&ch->lock, irq_flags);
1143                         return xpcUnequalMsgSizes;
1144                 }
1145         } else {
1146                 ch->msg_size = registration->msg_size;
1147
1148                 XPC_SET_REASON(ch, 0, 0);
1149                 ch->flags &= ~XPC_C_DISCONNECTED;
1150
1151                 atomic_inc(&xpc_partitions[ch->partid].nchannels_active);
1152         }
1153
1154         mutex_unlock(&registration->mutex);
1155
1156
1157         /* initiate the connection */
1158
1159         ch->flags |= (XPC_C_OPENREQUEST | XPC_C_CONNECTING);
1160         xpc_IPI_send_openrequest(ch, &irq_flags);
1161
1162         xpc_process_connect(ch, &irq_flags);
1163
1164         spin_unlock_irqrestore(&ch->lock, irq_flags);
1165
1166         return xpcSuccess;
1167 }
1168
1169
1170 /*
1171  * Clear some of the msg flags in the local message queue.
1172  */
1173 static inline void
1174 xpc_clear_local_msgqueue_flags(struct xpc_channel *ch)
1175 {
1176         struct xpc_msg *msg;
1177         s64 get;
1178
1179
1180         get = ch->w_remote_GP.get;
1181         do {
1182                 msg = (struct xpc_msg *) ((u64) ch->local_msgqueue +
1183                                 (get % ch->local_nentries) * ch->msg_size);
1184                 msg->flags = 0;
1185         } while (++get < (volatile s64) ch->remote_GP.get);
1186 }
1187
1188
1189 /*
1190  * Clear some of the msg flags in the remote message queue.
1191  */
1192 static inline void
1193 xpc_clear_remote_msgqueue_flags(struct xpc_channel *ch)
1194 {
1195         struct xpc_msg *msg;
1196         s64 put;
1197
1198
1199         put = ch->w_remote_GP.put;
1200         do {
1201                 msg = (struct xpc_msg *) ((u64) ch->remote_msgqueue +
1202                                 (put % ch->remote_nentries) * ch->msg_size);
1203                 msg->flags = 0;
1204         } while (++put < (volatile s64) ch->remote_GP.put);
1205 }
1206
1207
1208 static void
1209 xpc_process_msg_IPI(struct xpc_partition *part, int ch_number)
1210 {
1211         struct xpc_channel *ch = &part->channels[ch_number];
1212         int nmsgs_sent;
1213
1214
1215         ch->remote_GP = part->remote_GPs[ch_number];
1216
1217
1218         /* See what, if anything, has changed for each connected channel */
1219
1220         xpc_msgqueue_ref(ch);
1221
1222         if (ch->w_remote_GP.get == ch->remote_GP.get &&
1223                                 ch->w_remote_GP.put == ch->remote_GP.put) {
1224                 /* nothing changed since GPs were last pulled */
1225                 xpc_msgqueue_deref(ch);
1226                 return;
1227         }
1228
1229         if (!(ch->flags & XPC_C_CONNECTED)){
1230                 xpc_msgqueue_deref(ch);
1231                 return;
1232         }
1233
1234
1235         /*
1236          * First check to see if messages recently sent by us have been
1237          * received by the other side. (The remote GET value will have
1238          * changed since we last looked at it.)
1239          */
1240
1241         if (ch->w_remote_GP.get != ch->remote_GP.get) {
1242
1243                 /*
1244                  * We need to notify any senders that want to be notified
1245                  * that their sent messages have been received by their
1246                  * intended recipients. We need to do this before updating
1247                  * w_remote_GP.get so that we don't allocate the same message
1248                  * queue entries prematurely (see xpc_allocate_msg()).
1249                  */
1250                 if (atomic_read(&ch->n_to_notify) > 0) {
1251                         /*
1252                          * Notify senders that messages sent have been
1253                          * received and delivered by the other side.
1254                          */
1255                         xpc_notify_senders(ch, xpcMsgDelivered,
1256                                                         ch->remote_GP.get);
1257                 }
1258
1259                 /*
1260                  * Clear msg->flags in previously sent messages, so that
1261                  * they're ready for xpc_allocate_msg().
1262                  */
1263                 xpc_clear_local_msgqueue_flags(ch);
1264
1265                 ch->w_remote_GP.get = ch->remote_GP.get;
1266
1267                 dev_dbg(xpc_chan, "w_remote_GP.get changed to %ld, partid=%d, "
1268                         "channel=%d\n", ch->w_remote_GP.get, ch->partid,
1269                         ch->number);
1270
1271                 /*
1272                  * If anyone was waiting for message queue entries to become
1273                  * available, wake them up.
1274                  */
1275                 if (atomic_read(&ch->n_on_msg_allocate_wq) > 0) {
1276                         wake_up(&ch->msg_allocate_wq);
1277                 }
1278         }
1279
1280
1281         /*
1282          * Now check for newly sent messages by the other side. (The remote
1283          * PUT value will have changed since we last looked at it.)
1284          */
1285
1286         if (ch->w_remote_GP.put != ch->remote_GP.put) {
1287                 /*
1288                  * Clear msg->flags in previously received messages, so that
1289                  * they're ready for xpc_get_deliverable_msg().
1290                  */
1291                 xpc_clear_remote_msgqueue_flags(ch);
1292
1293                 ch->w_remote_GP.put = ch->remote_GP.put;
1294
1295                 dev_dbg(xpc_chan, "w_remote_GP.put changed to %ld, partid=%d, "
1296                         "channel=%d\n", ch->w_remote_GP.put, ch->partid,
1297                         ch->number);
1298
1299                 nmsgs_sent = ch->w_remote_GP.put - ch->w_local_GP.get;
1300                 if (nmsgs_sent > 0) {
1301                         dev_dbg(xpc_chan, "msgs waiting to be copied and "
1302                                 "delivered=%d, partid=%d, channel=%d\n",
1303                                 nmsgs_sent, ch->partid, ch->number);
1304
1305                         if (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) {
1306                                 xpc_activate_kthreads(ch, nmsgs_sent);
1307                         }
1308                 }
1309         }
1310
1311         xpc_msgqueue_deref(ch);
1312 }
1313
1314
1315 void
1316 xpc_process_channel_activity(struct xpc_partition *part)
1317 {
1318         unsigned long irq_flags;
1319         u64 IPI_amo, IPI_flags;
1320         struct xpc_channel *ch;
1321         int ch_number;
1322         u32 ch_flags;
1323
1324
1325         IPI_amo = xpc_get_IPI_flags(part);
1326
1327         /*
1328          * Initiate channel connections for registered channels.
1329          *
1330          * For each connected channel that has pending messages activate idle
1331          * kthreads and/or create new kthreads as needed.
1332          */
1333
1334         for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
1335                 ch = &part->channels[ch_number];
1336
1337
1338                 /*
1339                  * Process any open or close related IPI flags, and then deal
1340                  * with connecting or disconnecting the channel as required.
1341                  */
1342
1343                 IPI_flags = XPC_GET_IPI_FLAGS(IPI_amo, ch_number);
1344
1345                 if (XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(IPI_flags)) {
1346                         xpc_process_openclose_IPI(part, ch_number, IPI_flags);
1347                 }
1348
1349                 ch_flags = ch->flags;   /* need an atomic snapshot of flags */
1350
1351                 if (ch_flags & XPC_C_DISCONNECTING) {
1352                         spin_lock_irqsave(&ch->lock, irq_flags);
1353                         xpc_process_disconnect(ch, &irq_flags);
1354                         spin_unlock_irqrestore(&ch->lock, irq_flags);
1355                         continue;
1356                 }
1357
1358                 if (part->act_state == XPC_P_DEACTIVATING) {
1359                         continue;
1360                 }
1361
1362                 if (!(ch_flags & XPC_C_CONNECTED)) {
1363                         if (!(ch_flags & XPC_C_OPENREQUEST)) {
1364                                 DBUG_ON(ch_flags & XPC_C_SETUP);
1365                                 (void) xpc_connect_channel(ch);
1366                         } else {
1367                                 spin_lock_irqsave(&ch->lock, irq_flags);
1368                                 xpc_process_connect(ch, &irq_flags);
1369                                 spin_unlock_irqrestore(&ch->lock, irq_flags);
1370                         }
1371                         continue;
1372                 }
1373
1374
1375                 /*
1376                  * Process any message related IPI flags, this may involve the
1377                  * activation of kthreads to deliver any pending messages sent
1378                  * from the other partition.
1379                  */
1380
1381                 if (XPC_ANY_MSG_IPI_FLAGS_SET(IPI_flags)) {
1382                         xpc_process_msg_IPI(part, ch_number);
1383                 }
1384         }
1385 }
1386
1387
1388 /*
1389  * XPC's heartbeat code calls this function to inform XPC that a partition is
1390  * going down.  XPC responds by tearing down the XPartition Communication
1391  * infrastructure used for the just downed partition.
1392  *
1393  * XPC's heartbeat code will never call this function and xpc_partition_up()
1394  * at the same time. Nor will it ever make multiple calls to either function
1395  * at the same time.
1396  */
1397 void
1398 xpc_partition_going_down(struct xpc_partition *part, enum xpc_retval reason)
1399 {
1400         unsigned long irq_flags;
1401         int ch_number;
1402         struct xpc_channel *ch;
1403
1404
1405         dev_dbg(xpc_chan, "deactivating partition %d, reason=%d\n",
1406                 XPC_PARTID(part), reason);
1407
1408         if (!xpc_part_ref(part)) {
1409                 /* infrastructure for this partition isn't currently set up */
1410                 return;
1411         }
1412
1413
1414         /* disconnect channels associated with the partition going down */
1415
1416         for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
1417                 ch = &part->channels[ch_number];
1418
1419                 xpc_msgqueue_ref(ch);
1420                 spin_lock_irqsave(&ch->lock, irq_flags);
1421
1422                 XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
1423
1424                 spin_unlock_irqrestore(&ch->lock, irq_flags);
1425                 xpc_msgqueue_deref(ch);
1426         }
1427
1428         xpc_wakeup_channel_mgr(part);
1429
1430         xpc_part_deref(part);
1431 }
1432
1433
1434 /*
1435  * Teardown the infrastructure necessary to support XPartition Communication
1436  * between the specified remote partition and the local one.
1437  */
1438 void
1439 xpc_teardown_infrastructure(struct xpc_partition *part)
1440 {
1441         partid_t partid = XPC_PARTID(part);
1442
1443
1444         /*
1445          * We start off by making this partition inaccessible to local
1446          * processes by marking it as no longer setup. Then we make it
1447          * inaccessible to remote processes by clearing the XPC per partition
1448          * specific variable's magic # (which indicates that these variables
1449          * are no longer valid) and by ignoring all XPC notify IPIs sent to
1450          * this partition.
1451          */
1452
1453         DBUG_ON(atomic_read(&part->nchannels_engaged) != 0);
1454         DBUG_ON(atomic_read(&part->nchannels_active) != 0);
1455         DBUG_ON(part->setup_state != XPC_P_SETUP);
1456         part->setup_state = XPC_P_WTEARDOWN;
1457
1458         xpc_vars_part[partid].magic = 0;
1459
1460
1461         free_irq(SGI_XPC_NOTIFY, (void *) (u64) partid);
1462
1463
1464         /*
1465          * Before proceding with the teardown we have to wait until all
1466          * existing references cease.
1467          */
1468         wait_event(part->teardown_wq, (atomic_read(&part->references) == 0));
1469
1470
1471         /* now we can begin tearing down the infrastructure */
1472
1473         part->setup_state = XPC_P_TORNDOWN;
1474
1475         /* in case we've still got outstanding timers registered... */
1476         del_timer_sync(&part->dropped_IPI_timer);
1477
1478         kfree(part->remote_openclose_args_base);
1479         part->remote_openclose_args = NULL;
1480         kfree(part->local_openclose_args_base);
1481         part->local_openclose_args = NULL;
1482         kfree(part->remote_GPs_base);
1483         part->remote_GPs = NULL;
1484         kfree(part->local_GPs_base);
1485         part->local_GPs = NULL;
1486         kfree(part->channels);
1487         part->channels = NULL;
1488         part->local_IPI_amo_va = NULL;
1489 }
1490
1491
1492 /*
1493  * Called by XP at the time of channel connection registration to cause
1494  * XPC to establish connections to all currently active partitions.
1495  */
1496 void
1497 xpc_initiate_connect(int ch_number)
1498 {
1499         partid_t partid;
1500         struct xpc_partition *part;
1501         struct xpc_channel *ch;
1502
1503
1504         DBUG_ON(ch_number < 0 || ch_number >= XPC_NCHANNELS);
1505
1506         for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) {
1507                 part = &xpc_partitions[partid];
1508
1509                 if (xpc_part_ref(part)) {
1510                         ch = &part->channels[ch_number];
1511
1512                         /*
1513                          * Initiate the establishment of a connection on the
1514                          * newly registered channel to the remote partition.
1515                          */
1516                         xpc_wakeup_channel_mgr(part);
1517                         xpc_part_deref(part);
1518                 }
1519         }
1520 }
1521
1522
1523 void
1524 xpc_connected_callout(struct xpc_channel *ch)
1525 {
1526         /* let the registerer know that a connection has been established */
1527
1528         if (ch->func != NULL) {
1529                 dev_dbg(xpc_chan, "ch->func() called, reason=xpcConnected, "
1530                         "partid=%d, channel=%d\n", ch->partid, ch->number);
1531
1532                 ch->func(xpcConnected, ch->partid, ch->number,
1533                                 (void *) (u64) ch->local_nentries, ch->key);
1534
1535                 dev_dbg(xpc_chan, "ch->func() returned, reason=xpcConnected, "
1536                         "partid=%d, channel=%d\n", ch->partid, ch->number);
1537         }
1538 }
1539
1540
1541 /*
1542  * Called by XP at the time of channel connection unregistration to cause
1543  * XPC to teardown all current connections for the specified channel.
1544  *
1545  * Before returning xpc_initiate_disconnect() will wait until all connections
1546  * on the specified channel have been closed/torndown. So the caller can be
1547  * assured that they will not be receiving any more callouts from XPC to the
1548  * function they registered via xpc_connect().
1549  *
1550  * Arguments:
1551  *
1552  *      ch_number - channel # to unregister.
1553  */
1554 void
1555 xpc_initiate_disconnect(int ch_number)
1556 {
1557         unsigned long irq_flags;
1558         partid_t partid;
1559         struct xpc_partition *part;
1560         struct xpc_channel *ch;
1561
1562
1563         DBUG_ON(ch_number < 0 || ch_number >= XPC_NCHANNELS);
1564
1565         /* initiate the channel disconnect for every active partition */
1566         for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) {
1567                 part = &xpc_partitions[partid];
1568
1569                 if (xpc_part_ref(part)) {
1570                         ch = &part->channels[ch_number];
1571                         xpc_msgqueue_ref(ch);
1572
1573                         spin_lock_irqsave(&ch->lock, irq_flags);
1574
1575                         if (!(ch->flags & XPC_C_DISCONNECTED)) {
1576                                 ch->flags |= XPC_C_WDISCONNECT;
1577
1578                                 XPC_DISCONNECT_CHANNEL(ch, xpcUnregistering,
1579                                                                 &irq_flags);
1580                         }
1581
1582                         spin_unlock_irqrestore(&ch->lock, irq_flags);
1583
1584                         xpc_msgqueue_deref(ch);
1585                         xpc_part_deref(part);
1586                 }
1587         }
1588
1589         xpc_disconnect_wait(ch_number);
1590 }
1591
1592
1593 /*
1594  * To disconnect a channel, and reflect it back to all who may be waiting.
1595  *
1596  * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by
1597  * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by
1598  * xpc_disconnect_wait().
1599  *
1600  * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN.
1601  */
1602 void
1603 xpc_disconnect_channel(const int line, struct xpc_channel *ch,
1604                         enum xpc_retval reason, unsigned long *irq_flags)
1605 {
1606         u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED);
1607
1608
1609         DBUG_ON(!spin_is_locked(&ch->lock));
1610
1611         if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) {
1612                 return;
1613         }
1614         DBUG_ON(!(ch->flags & (XPC_C_CONNECTING | XPC_C_CONNECTED)));
1615
1616         dev_dbg(xpc_chan, "reason=%d, line=%d, partid=%d, channel=%d\n",
1617                 reason, line, ch->partid, ch->number);
1618
1619         XPC_SET_REASON(ch, reason, line);
1620
1621         ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING);
1622         /* some of these may not have been set */
1623         ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY |
1624                         XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
1625                         XPC_C_CONNECTING | XPC_C_CONNECTED);
1626
1627         xpc_IPI_send_closerequest(ch, irq_flags);
1628
1629         if (channel_was_connected) {
1630                 ch->flags |= XPC_C_WASCONNECTED;
1631         }
1632
1633         spin_unlock_irqrestore(&ch->lock, *irq_flags);
1634
1635         /* wake all idle kthreads so they can exit */
1636         if (atomic_read(&ch->kthreads_idle) > 0) {
1637                 wake_up_all(&ch->idle_wq);
1638         }
1639
1640         /* wake those waiting to allocate an entry from the local msg queue */
1641         if (atomic_read(&ch->n_on_msg_allocate_wq) > 0) {
1642                 wake_up(&ch->msg_allocate_wq);
1643         }
1644
1645         spin_lock_irqsave(&ch->lock, *irq_flags);
1646 }
1647
1648
1649 void
1650 xpc_disconnect_callout(struct xpc_channel *ch, enum xpc_retval reason)
1651 {
1652         /*
1653          * Let the channel's registerer know that the channel is being
1654          * disconnected. We don't want to do this if the registerer was never
1655          * informed of a connection being made.
1656          */
1657
1658         if (ch->func != NULL) {
1659                 dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, "
1660                         "channel=%d\n", reason, ch->partid, ch->number);
1661
1662                 ch->func(reason, ch->partid, ch->number, NULL, ch->key);
1663
1664                 dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, "
1665                         "channel=%d\n", reason, ch->partid, ch->number);
1666         }
1667 }
1668
1669
1670 /*
1671  * Wait for a message entry to become available for the specified channel,
1672  * but don't wait any longer than 1 jiffy.
1673  */
1674 static enum xpc_retval
1675 xpc_allocate_msg_wait(struct xpc_channel *ch)
1676 {
1677         enum xpc_retval ret;
1678
1679
1680         if (ch->flags & XPC_C_DISCONNECTING) {
1681                 DBUG_ON(ch->reason == xpcInterrupted);  // >>> Is this true?
1682                 return ch->reason;
1683         }
1684
1685         atomic_inc(&ch->n_on_msg_allocate_wq);
1686         ret = interruptible_sleep_on_timeout(&ch->msg_allocate_wq, 1);
1687         atomic_dec(&ch->n_on_msg_allocate_wq);
1688
1689         if (ch->flags & XPC_C_DISCONNECTING) {
1690                 ret = ch->reason;
1691                 DBUG_ON(ch->reason == xpcInterrupted);  // >>> Is this true?
1692         } else if (ret == 0) {
1693                 ret = xpcTimeout;
1694         } else {
1695                 ret = xpcInterrupted;
1696         }
1697
1698         return ret;
1699 }
1700
1701
1702 /*
1703  * Allocate an entry for a message from the message queue associated with the
1704  * specified channel.
1705  */
1706 static enum xpc_retval
1707 xpc_allocate_msg(struct xpc_channel *ch, u32 flags,
1708                         struct xpc_msg **address_of_msg)
1709 {
1710         struct xpc_msg *msg;
1711         enum xpc_retval ret;
1712         s64 put;
1713
1714
1715         /* this reference will be dropped in xpc_send_msg() */
1716         xpc_msgqueue_ref(ch);
1717
1718         if (ch->flags & XPC_C_DISCONNECTING) {
1719                 xpc_msgqueue_deref(ch);
1720                 return ch->reason;
1721         }
1722         if (!(ch->flags & XPC_C_CONNECTED)) {
1723                 xpc_msgqueue_deref(ch);
1724                 return xpcNotConnected;
1725         }
1726
1727
1728         /*
1729          * Get the next available message entry from the local message queue.
1730          * If none are available, we'll make sure that we grab the latest
1731          * GP values.
1732          */
1733         ret = xpcTimeout;
1734
1735         while (1) {
1736
1737                 put = (volatile s64) ch->w_local_GP.put;
1738                 if (put - (volatile s64) ch->w_remote_GP.get <
1739                                                         ch->local_nentries) {
1740
1741                         /* There are available message entries. We need to try
1742                          * to secure one for ourselves. We'll do this by trying
1743                          * to increment w_local_GP.put as long as someone else
1744                          * doesn't beat us to it. If they do, we'll have to
1745                          * try again.
1746                          */
1747                         if (cmpxchg(&ch->w_local_GP.put, put, put + 1) ==
1748                                                                         put) {
1749                                 /* we got the entry referenced by put */
1750                                 break;
1751                         }
1752                         continue;       /* try again */
1753                 }
1754
1755
1756                 /*
1757                  * There aren't any available msg entries at this time.
1758                  *
1759                  * In waiting for a message entry to become available,
1760                  * we set a timeout in case the other side is not
1761                  * sending completion IPIs. This lets us fake an IPI
1762                  * that will cause the IPI handler to fetch the latest
1763                  * GP values as if an IPI was sent by the other side.
1764                  */
1765                 if (ret == xpcTimeout) {
1766                         xpc_IPI_send_local_msgrequest(ch);
1767                 }
1768
1769                 if (flags & XPC_NOWAIT) {
1770                         xpc_msgqueue_deref(ch);
1771                         return xpcNoWait;
1772                 }
1773
1774                 ret = xpc_allocate_msg_wait(ch);
1775                 if (ret != xpcInterrupted && ret != xpcTimeout) {
1776                         xpc_msgqueue_deref(ch);
1777                         return ret;
1778                 }
1779         }
1780
1781
1782         /* get the message's address and initialize it */
1783         msg = (struct xpc_msg *) ((u64) ch->local_msgqueue +
1784                                 (put % ch->local_nentries) * ch->msg_size);
1785
1786
1787         DBUG_ON(msg->flags != 0);
1788         msg->number = put;
1789
1790         dev_dbg(xpc_chan, "w_local_GP.put changed to %ld; msg=0x%p, "
1791                 "msg_number=%ld, partid=%d, channel=%d\n", put + 1,
1792                 (void *) msg, msg->number, ch->partid, ch->number);
1793
1794         *address_of_msg = msg;
1795
1796         return xpcSuccess;
1797 }
1798
1799
1800 /*
1801  * Allocate an entry for a message from the message queue associated with the
1802  * specified channel. NOTE that this routine can sleep waiting for a message
1803  * entry to become available. To not sleep, pass in the XPC_NOWAIT flag.
1804  *
1805  * Arguments:
1806  *
1807  *      partid - ID of partition to which the channel is connected.
1808  *      ch_number - channel #.
1809  *      flags - see xpc.h for valid flags.
1810  *      payload - address of the allocated payload area pointer (filled in on
1811  *                return) in which the user-defined message is constructed.
1812  */
1813 enum xpc_retval
1814 xpc_initiate_allocate(partid_t partid, int ch_number, u32 flags, void **payload)
1815 {
1816         struct xpc_partition *part = &xpc_partitions[partid];
1817         enum xpc_retval ret = xpcUnknownReason;
1818         struct xpc_msg *msg;
1819
1820
1821         DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
1822         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1823
1824         *payload = NULL;
1825
1826         if (xpc_part_ref(part)) {
1827                 ret = xpc_allocate_msg(&part->channels[ch_number], flags, &msg);
1828                 xpc_part_deref(part);
1829
1830                 if (msg != NULL) {
1831                         *payload = &msg->payload;
1832                 }
1833         }
1834
1835         return ret;
1836 }
1837
1838
1839 /*
1840  * Now we actually send the messages that are ready to be sent by advancing
1841  * the local message queue's Put value and then send an IPI to the recipient
1842  * partition.
1843  */
1844 static void
1845 xpc_send_msgs(struct xpc_channel *ch, s64 initial_put)
1846 {
1847         struct xpc_msg *msg;
1848         s64 put = initial_put + 1;
1849         int send_IPI = 0;
1850
1851
1852         while (1) {
1853
1854                 while (1) {
1855                         if (put == (volatile s64) ch->w_local_GP.put) {
1856                                 break;
1857                         }
1858
1859                         msg = (struct xpc_msg *) ((u64) ch->local_msgqueue +
1860                                (put % ch->local_nentries) * ch->msg_size);
1861
1862                         if (!(msg->flags & XPC_M_READY)) {
1863                                 break;
1864                         }
1865
1866                         put++;
1867                 }
1868
1869                 if (put == initial_put) {
1870                         /* nothing's changed */
1871                         break;
1872                 }
1873
1874                 if (cmpxchg_rel(&ch->local_GP->put, initial_put, put) !=
1875                                                                 initial_put) {
1876                         /* someone else beat us to it */
1877                         DBUG_ON((volatile s64) ch->local_GP->put < initial_put);
1878                         break;
1879                 }
1880
1881                 /* we just set the new value of local_GP->put */
1882
1883                 dev_dbg(xpc_chan, "local_GP->put changed to %ld, partid=%d, "
1884                         "channel=%d\n", put, ch->partid, ch->number);
1885
1886                 send_IPI = 1;
1887
1888                 /*
1889                  * We need to ensure that the message referenced by
1890                  * local_GP->put is not XPC_M_READY or that local_GP->put
1891                  * equals w_local_GP.put, so we'll go have a look.
1892                  */
1893                 initial_put = put;
1894         }
1895
1896         if (send_IPI) {
1897                 xpc_IPI_send_msgrequest(ch);
1898         }
1899 }
1900
1901
1902 /*
1903  * Common code that does the actual sending of the message by advancing the
1904  * local message queue's Put value and sends an IPI to the partition the
1905  * message is being sent to.
1906  */
1907 static enum xpc_retval
1908 xpc_send_msg(struct xpc_channel *ch, struct xpc_msg *msg, u8 notify_type,
1909                         xpc_notify_func func, void *key)
1910 {
1911         enum xpc_retval ret = xpcSuccess;
1912         struct xpc_notify *notify = notify;
1913         s64 put, msg_number = msg->number;
1914
1915
1916         DBUG_ON(notify_type == XPC_N_CALL && func == NULL);
1917         DBUG_ON((((u64) msg - (u64) ch->local_msgqueue) / ch->msg_size) !=
1918                                         msg_number % ch->local_nentries);
1919         DBUG_ON(msg->flags & XPC_M_READY);
1920
1921         if (ch->flags & XPC_C_DISCONNECTING) {
1922                 /* drop the reference grabbed in xpc_allocate_msg() */
1923                 xpc_msgqueue_deref(ch);
1924                 return ch->reason;
1925         }
1926
1927         if (notify_type != 0) {
1928                 /*
1929                  * Tell the remote side to send an ACK interrupt when the
1930                  * message has been delivered.
1931                  */
1932                 msg->flags |= XPC_M_INTERRUPT;
1933
1934                 atomic_inc(&ch->n_to_notify);
1935
1936                 notify = &ch->notify_queue[msg_number % ch->local_nentries];
1937                 notify->func = func;
1938                 notify->key = key;
1939                 notify->type = notify_type;
1940
1941                 // >>> is a mb() needed here?
1942
1943                 if (ch->flags & XPC_C_DISCONNECTING) {
1944                         /*
1945                          * An error occurred between our last error check and
1946                          * this one. We will try to clear the type field from
1947                          * the notify entry. If we succeed then
1948                          * xpc_disconnect_channel() didn't already process
1949                          * the notify entry.
1950                          */
1951                         if (cmpxchg(&notify->type, notify_type, 0) ==
1952                                                                 notify_type) {
1953                                 atomic_dec(&ch->n_to_notify);
1954                                 ret = ch->reason;
1955                         }
1956
1957                         /* drop the reference grabbed in xpc_allocate_msg() */
1958                         xpc_msgqueue_deref(ch);
1959                         return ret;
1960                 }
1961         }
1962
1963         msg->flags |= XPC_M_READY;
1964
1965         /*
1966          * The preceding store of msg->flags must occur before the following
1967          * load of ch->local_GP->put.
1968          */
1969         mb();
1970
1971         /* see if the message is next in line to be sent, if so send it */
1972
1973         put = ch->local_GP->put;
1974         if (put == msg_number) {
1975                 xpc_send_msgs(ch, put);
1976         }
1977
1978         /* drop the reference grabbed in xpc_allocate_msg() */
1979         xpc_msgqueue_deref(ch);
1980         return ret;
1981 }
1982
1983
1984 /*
1985  * Send a message previously allocated using xpc_initiate_allocate() on the
1986  * specified channel connected to the specified partition.
1987  *
1988  * This routine will not wait for the message to be received, nor will
1989  * notification be given when it does happen. Once this routine has returned
1990  * the message entry allocated via xpc_initiate_allocate() is no longer
1991  * accessable to the caller.
1992  *
1993  * This routine, although called by users, does not call xpc_part_ref() to
1994  * ensure that the partition infrastructure is in place. It relies on the
1995  * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg().
1996  *
1997  * Arguments:
1998  *
1999  *      partid - ID of partition to which the channel is connected.
2000  *      ch_number - channel # to send message on.
2001  *      payload - pointer to the payload area allocated via
2002  *                      xpc_initiate_allocate().
2003  */
2004 enum xpc_retval
2005 xpc_initiate_send(partid_t partid, int ch_number, void *payload)
2006 {
2007         struct xpc_partition *part = &xpc_partitions[partid];
2008         struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
2009         enum xpc_retval ret;
2010
2011
2012         dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *) msg,
2013                 partid, ch_number);
2014
2015         DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
2016         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
2017         DBUG_ON(msg == NULL);
2018
2019         ret = xpc_send_msg(&part->channels[ch_number], msg, 0, NULL, NULL);
2020
2021         return ret;
2022 }
2023
2024
2025 /*
2026  * Send a message previously allocated using xpc_initiate_allocate on the
2027  * specified channel connected to the specified partition.
2028  *
2029  * This routine will not wait for the message to be sent. Once this routine
2030  * has returned the message entry allocated via xpc_initiate_allocate() is no
2031  * longer accessable to the caller.
2032  *
2033  * Once the remote end of the channel has received the message, the function
2034  * passed as an argument to xpc_initiate_send_notify() will be called. This
2035  * allows the sender to free up or re-use any buffers referenced by the
2036  * message, but does NOT mean the message has been processed at the remote
2037  * end by a receiver.
2038  *
2039  * If this routine returns an error, the caller's function will NOT be called.
2040  *
2041  * This routine, although called by users, does not call xpc_part_ref() to
2042  * ensure that the partition infrastructure is in place. It relies on the
2043  * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg().
2044  *
2045  * Arguments:
2046  *
2047  *      partid - ID of partition to which the channel is connected.
2048  *      ch_number - channel # to send message on.
2049  *      payload - pointer to the payload area allocated via
2050  *                      xpc_initiate_allocate().
2051  *      func - function to call with asynchronous notification of message
2052  *                receipt. THIS FUNCTION MUST BE NON-BLOCKING.
2053  *      key - user-defined key to be passed to the function when it's called.
2054  */
2055 enum xpc_retval
2056 xpc_initiate_send_notify(partid_t partid, int ch_number, void *payload,
2057                                 xpc_notify_func func, void *key)
2058 {
2059         struct xpc_partition *part = &xpc_partitions[partid];
2060         struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
2061         enum xpc_retval ret;
2062
2063
2064         dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *) msg,
2065                 partid, ch_number);
2066
2067         DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
2068         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
2069         DBUG_ON(msg == NULL);
2070         DBUG_ON(func == NULL);
2071
2072         ret = xpc_send_msg(&part->channels[ch_number], msg, XPC_N_CALL,
2073                                                                 func, key);
2074         return ret;
2075 }
2076
2077
2078 static struct xpc_msg *
2079 xpc_pull_remote_msg(struct xpc_channel *ch, s64 get)
2080 {
2081         struct xpc_partition *part = &xpc_partitions[ch->partid];
2082         struct xpc_msg *remote_msg, *msg;
2083         u32 msg_index, nmsgs;
2084         u64 msg_offset;
2085         enum xpc_retval ret;
2086
2087
2088         if (mutex_lock_interruptible(&ch->msg_to_pull_mutex) != 0) {
2089                 /* we were interrupted by a signal */
2090                 return NULL;
2091         }
2092
2093         while (get >= ch->next_msg_to_pull) {
2094
2095                 /* pull as many messages as are ready and able to be pulled */
2096
2097                 msg_index = ch->next_msg_to_pull % ch->remote_nentries;
2098
2099                 DBUG_ON(ch->next_msg_to_pull >=
2100                                         (volatile s64) ch->w_remote_GP.put);
2101                 nmsgs =  (volatile s64) ch->w_remote_GP.put -
2102                                                 ch->next_msg_to_pull;
2103                 if (msg_index + nmsgs > ch->remote_nentries) {
2104                         /* ignore the ones that wrap the msg queue for now */
2105                         nmsgs = ch->remote_nentries - msg_index;
2106                 }
2107
2108                 msg_offset = msg_index * ch->msg_size;
2109                 msg = (struct xpc_msg *) ((u64) ch->remote_msgqueue +
2110                                                                 msg_offset);
2111                 remote_msg = (struct xpc_msg *) (ch->remote_msgqueue_pa +
2112                                                                 msg_offset);
2113
2114                 if ((ret = xpc_pull_remote_cachelines(part, msg, remote_msg,
2115                                 nmsgs * ch->msg_size)) != xpcSuccess) {
2116
2117                         dev_dbg(xpc_chan, "failed to pull %d msgs starting with"
2118                                 " msg %ld from partition %d, channel=%d, "
2119                                 "ret=%d\n", nmsgs, ch->next_msg_to_pull,
2120                                 ch->partid, ch->number, ret);
2121
2122                         XPC_DEACTIVATE_PARTITION(part, ret);
2123
2124                         mutex_unlock(&ch->msg_to_pull_mutex);
2125                         return NULL;
2126                 }
2127
2128                 mb();   /* >>> this may not be needed, we're not sure */
2129
2130                 ch->next_msg_to_pull += nmsgs;
2131         }
2132
2133         mutex_unlock(&ch->msg_to_pull_mutex);
2134
2135         /* return the message we were looking for */
2136         msg_offset = (get % ch->remote_nentries) * ch->msg_size;
2137         msg = (struct xpc_msg *) ((u64) ch->remote_msgqueue + msg_offset);
2138
2139         return msg;
2140 }
2141
2142
2143 /*
2144  * Get a message to be delivered.
2145  */
2146 static struct xpc_msg *
2147 xpc_get_deliverable_msg(struct xpc_channel *ch)
2148 {
2149         struct xpc_msg *msg = NULL;
2150         s64 get;
2151
2152
2153         do {
2154                 if ((volatile u32) ch->flags & XPC_C_DISCONNECTING) {
2155                         break;
2156                 }
2157
2158                 get = (volatile s64) ch->w_local_GP.get;
2159                 if (get == (volatile s64) ch->w_remote_GP.put) {
2160                         break;
2161                 }
2162
2163                 /* There are messages waiting to be pulled and delivered.
2164                  * We need to try to secure one for ourselves. We'll do this
2165                  * by trying to increment w_local_GP.get and hope that no one
2166                  * else beats us to it. If they do, we'll we'll simply have
2167                  * to try again for the next one.
2168                  */
2169
2170                 if (cmpxchg(&ch->w_local_GP.get, get, get + 1) == get) {
2171                         /* we got the entry referenced by get */
2172
2173                         dev_dbg(xpc_chan, "w_local_GP.get changed to %ld, "
2174                                 "partid=%d, channel=%d\n", get + 1,
2175                                 ch->partid, ch->number);
2176
2177                         /* pull the message from the remote partition */
2178
2179                         msg = xpc_pull_remote_msg(ch, get);
2180
2181                         DBUG_ON(msg != NULL && msg->number != get);
2182                         DBUG_ON(msg != NULL && (msg->flags & XPC_M_DONE));
2183                         DBUG_ON(msg != NULL && !(msg->flags & XPC_M_READY));
2184
2185                         break;
2186                 }
2187
2188         } while (1);
2189
2190         return msg;
2191 }
2192
2193
2194 /*
2195  * Deliver a message to its intended recipient.
2196  */
2197 void
2198 xpc_deliver_msg(struct xpc_channel *ch)
2199 {
2200         struct xpc_msg *msg;
2201
2202
2203         if ((msg = xpc_get_deliverable_msg(ch)) != NULL) {
2204
2205                 /*
2206                  * This ref is taken to protect the payload itself from being
2207                  * freed before the user is finished with it, which the user
2208                  * indicates by calling xpc_initiate_received().
2209                  */
2210                 xpc_msgqueue_ref(ch);
2211
2212                 atomic_inc(&ch->kthreads_active);
2213
2214                 if (ch->func != NULL) {
2215                         dev_dbg(xpc_chan, "ch->func() called, msg=0x%p, "
2216                                 "msg_number=%ld, partid=%d, channel=%d\n",
2217                                 (void *) msg, msg->number, ch->partid,
2218                                 ch->number);
2219
2220                         /* deliver the message to its intended recipient */
2221                         ch->func(xpcMsgReceived, ch->partid, ch->number,
2222                                         &msg->payload, ch->key);
2223
2224                         dev_dbg(xpc_chan, "ch->func() returned, msg=0x%p, "
2225                                 "msg_number=%ld, partid=%d, channel=%d\n",
2226                                 (void *) msg, msg->number, ch->partid,
2227                                 ch->number);
2228                 }
2229
2230                 atomic_dec(&ch->kthreads_active);
2231         }
2232 }
2233
2234
2235 /*
2236  * Now we actually acknowledge the messages that have been delivered and ack'd
2237  * by advancing the cached remote message queue's Get value and if requested
2238  * send an IPI to the message sender's partition.
2239  */
2240 static void
2241 xpc_acknowledge_msgs(struct xpc_channel *ch, s64 initial_get, u8 msg_flags)
2242 {
2243         struct xpc_msg *msg;
2244         s64 get = initial_get + 1;
2245         int send_IPI = 0;
2246
2247
2248         while (1) {
2249
2250                 while (1) {
2251                         if (get == (volatile s64) ch->w_local_GP.get) {
2252                                 break;
2253                         }
2254
2255                         msg = (struct xpc_msg *) ((u64) ch->remote_msgqueue +
2256                                (get % ch->remote_nentries) * ch->msg_size);
2257
2258                         if (!(msg->flags & XPC_M_DONE)) {
2259                                 break;
2260                         }
2261
2262                         msg_flags |= msg->flags;
2263                         get++;
2264                 }
2265
2266                 if (get == initial_get) {
2267                         /* nothing's changed */
2268                         break;
2269                 }
2270
2271                 if (cmpxchg_rel(&ch->local_GP->get, initial_get, get) !=
2272                                                                 initial_get) {
2273                         /* someone else beat us to it */
2274                         DBUG_ON((volatile s64) ch->local_GP->get <=
2275                                                                 initial_get);
2276                         break;
2277                 }
2278
2279                 /* we just set the new value of local_GP->get */
2280
2281                 dev_dbg(xpc_chan, "local_GP->get changed to %ld, partid=%d, "
2282                         "channel=%d\n", get, ch->partid, ch->number);
2283
2284                 send_IPI = (msg_flags & XPC_M_INTERRUPT);
2285
2286                 /*
2287                  * We need to ensure that the message referenced by
2288                  * local_GP->get is not XPC_M_DONE or that local_GP->get
2289                  * equals w_local_GP.get, so we'll go have a look.
2290                  */
2291                 initial_get = get;
2292         }
2293
2294         if (send_IPI) {
2295                 xpc_IPI_send_msgrequest(ch);
2296         }
2297 }
2298
2299
2300 /*
2301  * Acknowledge receipt of a delivered message.
2302  *
2303  * If a message has XPC_M_INTERRUPT set, send an interrupt to the partition
2304  * that sent the message.
2305  *
2306  * This function, although called by users, does not call xpc_part_ref() to
2307  * ensure that the partition infrastructure is in place. It relies on the
2308  * fact that we called xpc_msgqueue_ref() in xpc_deliver_msg().
2309  *
2310  * Arguments:
2311  *
2312  *      partid - ID of partition to which the channel is connected.
2313  *      ch_number - channel # message received on.
2314  *      payload - pointer to the payload area allocated via
2315  *                      xpc_initiate_allocate().
2316  */
2317 void
2318 xpc_initiate_received(partid_t partid, int ch_number, void *payload)
2319 {
2320         struct xpc_partition *part = &xpc_partitions[partid];
2321         struct xpc_channel *ch;
2322         struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
2323         s64 get, msg_number = msg->number;
2324
2325
2326         DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
2327         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
2328
2329         ch = &part->channels[ch_number];
2330
2331         dev_dbg(xpc_chan, "msg=0x%p, msg_number=%ld, partid=%d, channel=%d\n",
2332                 (void *) msg, msg_number, ch->partid, ch->number);
2333
2334         DBUG_ON((((u64) msg - (u64) ch->remote_msgqueue) / ch->msg_size) !=
2335                                         msg_number % ch->remote_nentries);
2336         DBUG_ON(msg->flags & XPC_M_DONE);
2337
2338         msg->flags |= XPC_M_DONE;
2339
2340         /*
2341          * The preceding store of msg->flags must occur before the following
2342          * load of ch->local_GP->get.
2343          */
2344         mb();
2345
2346         /*
2347          * See if this message is next in line to be acknowledged as having
2348          * been delivered.
2349          */
2350         get = ch->local_GP->get;
2351         if (get == msg_number) {
2352                 xpc_acknowledge_msgs(ch, get, msg->flags);
2353         }
2354
2355         /* the call to xpc_msgqueue_ref() was done by xpc_deliver_msg()  */
2356         xpc_msgqueue_deref(ch);
2357 }
2358