Merge git://git.infradead.org/~dwmw2/rbtree-2.6
[sfrench/cifs-2.6.git] / fs / 9p / mux.c
1 /*
2  * linux/fs/9p/mux.c
3  *
4  * Protocol Multiplexer
5  *
6  *  Copyright (C) 2004 by Eric Van Hensbergen <ericvh@gmail.com>
7  *  Copyright (C) 2004-2005 by Latchesar Ionkov <lucho@ionkov.net>
8  *
9  *  This program is free software; you can redistribute it and/or modify
10  *  it under the terms of the GNU General Public License version 2
11  *  as published by the Free Software Foundation.
12  *
13  *  This program is distributed in the hope that it will be useful,
14  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *  GNU General Public License for more details.
17  *
18  *  You should have received a copy of the GNU General Public License
19  *  along with this program; if not, write to:
20  *  Free Software Foundation
21  *  51 Franklin Street, Fifth Floor
22  *  Boston, MA  02111-1301  USA
23  *
24  */
25
26 #include <linux/config.h>
27 #include <linux/module.h>
28 #include <linux/errno.h>
29 #include <linux/fs.h>
30 #include <linux/poll.h>
31 #include <linux/kthread.h>
32 #include <linux/idr.h>
33 #include <linux/mutex.h>
34
35 #include "debug.h"
36 #include "v9fs.h"
37 #include "9p.h"
38 #include "conv.h"
39 #include "transport.h"
40 #include "mux.h"
41
42 #define ERREQFLUSH      1
43 #define SCHED_TIMEOUT   10
44 #define MAXPOLLWADDR    2
45
46 enum {
47         Rworksched = 1,         /* read work scheduled or running */
48         Rpending = 2,           /* can read */
49         Wworksched = 4,         /* write work scheduled or running */
50         Wpending = 8,           /* can write */
51 };
52
53 enum {
54         None,
55         Flushing,
56         Flushed,
57 };
58
59 struct v9fs_mux_poll_task;
60
61 struct v9fs_req {
62         spinlock_t lock;
63         int tag;
64         struct v9fs_fcall *tcall;
65         struct v9fs_fcall *rcall;
66         int err;
67         v9fs_mux_req_callback cb;
68         void *cba;
69         int flush;
70         struct list_head req_list;
71 };
72
73 struct v9fs_mux_data {
74         spinlock_t lock;
75         struct list_head mux_list;
76         struct v9fs_mux_poll_task *poll_task;
77         int msize;
78         unsigned char *extended;
79         struct v9fs_transport *trans;
80         struct v9fs_idpool tagpool;
81         int err;
82         wait_queue_head_t equeue;
83         struct list_head req_list;
84         struct list_head unsent_req_list;
85         struct v9fs_fcall *rcall;
86         int rpos;
87         char *rbuf;
88         int wpos;
89         int wsize;
90         char *wbuf;
91         wait_queue_t poll_wait[MAXPOLLWADDR];
92         wait_queue_head_t *poll_waddr[MAXPOLLWADDR];
93         poll_table pt;
94         struct work_struct rq;
95         struct work_struct wq;
96         unsigned long wsched;
97 };
98
99 struct v9fs_mux_poll_task {
100         struct task_struct *task;
101         struct list_head mux_list;
102         int muxnum;
103 };
104
105 struct v9fs_mux_rpc {
106         struct v9fs_mux_data *m;
107         int err;
108         struct v9fs_fcall *tcall;
109         struct v9fs_fcall *rcall;
110         wait_queue_head_t wqueue;
111 };
112
113 static int v9fs_poll_proc(void *);
114 static void v9fs_read_work(void *);
115 static void v9fs_write_work(void *);
116 static void v9fs_pollwait(struct file *filp, wait_queue_head_t * wait_address,
117                           poll_table * p);
118 static u16 v9fs_mux_get_tag(struct v9fs_mux_data *);
119 static void v9fs_mux_put_tag(struct v9fs_mux_data *, u16);
120
121 static DEFINE_MUTEX(v9fs_mux_task_lock);
122 static struct workqueue_struct *v9fs_mux_wq;
123
124 static int v9fs_mux_num;
125 static int v9fs_mux_poll_task_num;
126 static struct v9fs_mux_poll_task v9fs_mux_poll_tasks[100];
127
128 int v9fs_mux_global_init(void)
129 {
130         int i;
131
132         for (i = 0; i < ARRAY_SIZE(v9fs_mux_poll_tasks); i++)
133                 v9fs_mux_poll_tasks[i].task = NULL;
134
135         v9fs_mux_wq = create_workqueue("v9fs");
136         if (!v9fs_mux_wq)
137                 return -ENOMEM;
138
139         return 0;
140 }
141
142 void v9fs_mux_global_exit(void)
143 {
144         destroy_workqueue(v9fs_mux_wq);
145 }
146
147 /**
148  * v9fs_mux_calc_poll_procs - calculates the number of polling procs
149  * based on the number of mounted v9fs filesystems.
150  *
151  * The current implementation returns sqrt of the number of mounts.
152  */
153 static int v9fs_mux_calc_poll_procs(int muxnum)
154 {
155         int n;
156
157         if (v9fs_mux_poll_task_num)
158                 n = muxnum / v9fs_mux_poll_task_num +
159                     (muxnum % v9fs_mux_poll_task_num ? 1 : 0);
160         else
161                 n = 1;
162
163         if (n > ARRAY_SIZE(v9fs_mux_poll_tasks))
164                 n = ARRAY_SIZE(v9fs_mux_poll_tasks);
165
166         return n;
167 }
168
169 static int v9fs_mux_poll_start(struct v9fs_mux_data *m)
170 {
171         int i, n;
172         struct v9fs_mux_poll_task *vpt, *vptlast;
173         struct task_struct *pproc;
174
175         dprintk(DEBUG_MUX, "mux %p muxnum %d procnum %d\n", m, v9fs_mux_num,
176                 v9fs_mux_poll_task_num);
177         mutex_lock(&v9fs_mux_task_lock);
178
179         n = v9fs_mux_calc_poll_procs(v9fs_mux_num + 1);
180         if (n > v9fs_mux_poll_task_num) {
181                 for (i = 0; i < ARRAY_SIZE(v9fs_mux_poll_tasks); i++) {
182                         if (v9fs_mux_poll_tasks[i].task == NULL) {
183                                 vpt = &v9fs_mux_poll_tasks[i];
184                                 dprintk(DEBUG_MUX, "create proc %p\n", vpt);
185                                 pproc = kthread_create(v9fs_poll_proc, vpt,
186                                                    "v9fs-poll");
187
188                                 if (!IS_ERR(pproc)) {
189                                         vpt->task = pproc;
190                                         INIT_LIST_HEAD(&vpt->mux_list);
191                                         vpt->muxnum = 0;
192                                         v9fs_mux_poll_task_num++;
193                                         wake_up_process(vpt->task);
194                                 }
195                                 break;
196                         }
197                 }
198
199                 if (i >= ARRAY_SIZE(v9fs_mux_poll_tasks))
200                         dprintk(DEBUG_ERROR, "warning: no free poll slots\n");
201         }
202
203         n = (v9fs_mux_num + 1) / v9fs_mux_poll_task_num +
204             ((v9fs_mux_num + 1) % v9fs_mux_poll_task_num ? 1 : 0);
205
206         vptlast = NULL;
207         for (i = 0; i < ARRAY_SIZE(v9fs_mux_poll_tasks); i++) {
208                 vpt = &v9fs_mux_poll_tasks[i];
209                 if (vpt->task != NULL) {
210                         vptlast = vpt;
211                         if (vpt->muxnum < n) {
212                                 dprintk(DEBUG_MUX, "put in proc %d\n", i);
213                                 list_add(&m->mux_list, &vpt->mux_list);
214                                 vpt->muxnum++;
215                                 m->poll_task = vpt;
216                                 memset(&m->poll_waddr, 0, sizeof(m->poll_waddr));
217                                 init_poll_funcptr(&m->pt, v9fs_pollwait);
218                                 break;
219                         }
220                 }
221         }
222
223         if (i >= ARRAY_SIZE(v9fs_mux_poll_tasks)) {
224                 if (vptlast == NULL)
225                         return -ENOMEM;
226
227                 dprintk(DEBUG_MUX, "put in proc %d\n", i);
228                 list_add(&m->mux_list, &vptlast->mux_list);
229                 vptlast->muxnum++;
230                 m->poll_task = vptlast;
231                 memset(&m->poll_waddr, 0, sizeof(m->poll_waddr));
232                 init_poll_funcptr(&m->pt, v9fs_pollwait);
233         }
234
235         v9fs_mux_num++;
236         mutex_unlock(&v9fs_mux_task_lock);
237
238         return 0;
239 }
240
241 static void v9fs_mux_poll_stop(struct v9fs_mux_data *m)
242 {
243         int i;
244         struct v9fs_mux_poll_task *vpt;
245
246         mutex_lock(&v9fs_mux_task_lock);
247         vpt = m->poll_task;
248         list_del(&m->mux_list);
249         for(i = 0; i < ARRAY_SIZE(m->poll_waddr); i++) {
250                 if (m->poll_waddr[i] != NULL) {
251                         remove_wait_queue(m->poll_waddr[i], &m->poll_wait[i]);
252                         m->poll_waddr[i] = NULL;
253                 }
254         }
255         vpt->muxnum--;
256         if (!vpt->muxnum) {
257                 dprintk(DEBUG_MUX, "destroy proc %p\n", vpt);
258                 send_sig(SIGKILL, vpt->task, 1);
259                 vpt->task = NULL;
260                 v9fs_mux_poll_task_num--;
261         }
262         v9fs_mux_num--;
263         mutex_unlock(&v9fs_mux_task_lock);
264 }
265
266 /**
267  * v9fs_mux_init - allocate and initialize the per-session mux data
268  * Creates the polling task if this is the first session.
269  *
270  * @trans - transport structure
271  * @msize - maximum message size
272  * @extended - pointer to the extended flag
273  */
274 struct v9fs_mux_data *v9fs_mux_init(struct v9fs_transport *trans, int msize,
275                                     unsigned char *extended)
276 {
277         int i, n;
278         struct v9fs_mux_data *m, *mtmp;
279
280         dprintk(DEBUG_MUX, "transport %p msize %d\n", trans, msize);
281         m = kmalloc(sizeof(struct v9fs_mux_data), GFP_KERNEL);
282         if (!m)
283                 return ERR_PTR(-ENOMEM);
284
285         spin_lock_init(&m->lock);
286         INIT_LIST_HEAD(&m->mux_list);
287         m->msize = msize;
288         m->extended = extended;
289         m->trans = trans;
290         idr_init(&m->tagpool.pool);
291         init_MUTEX(&m->tagpool.lock);
292         m->err = 0;
293         init_waitqueue_head(&m->equeue);
294         INIT_LIST_HEAD(&m->req_list);
295         INIT_LIST_HEAD(&m->unsent_req_list);
296         m->rcall = NULL;
297         m->rpos = 0;
298         m->rbuf = NULL;
299         m->wpos = m->wsize = 0;
300         m->wbuf = NULL;
301         INIT_WORK(&m->rq, v9fs_read_work, m);
302         INIT_WORK(&m->wq, v9fs_write_work, m);
303         m->wsched = 0;
304         memset(&m->poll_waddr, 0, sizeof(m->poll_waddr));
305         m->poll_task = NULL;
306         n = v9fs_mux_poll_start(m);
307         if (n)
308                 return ERR_PTR(n);
309
310         n = trans->poll(trans, &m->pt);
311         if (n & POLLIN) {
312                 dprintk(DEBUG_MUX, "mux %p can read\n", m);
313                 set_bit(Rpending, &m->wsched);
314         }
315
316         if (n & POLLOUT) {
317                 dprintk(DEBUG_MUX, "mux %p can write\n", m);
318                 set_bit(Wpending, &m->wsched);
319         }
320
321         for(i = 0; i < ARRAY_SIZE(m->poll_waddr); i++) {
322                 if (IS_ERR(m->poll_waddr[i])) {
323                         v9fs_mux_poll_stop(m);
324                         mtmp = (void *)m->poll_waddr;   /* the error code */
325                         kfree(m);
326                         m = mtmp;
327                         break;
328                 }
329         }
330
331         return m;
332 }
333
334 /**
335  * v9fs_mux_destroy - cancels all pending requests and frees mux resources
336  */
337 void v9fs_mux_destroy(struct v9fs_mux_data *m)
338 {
339         dprintk(DEBUG_MUX, "mux %p prev %p next %p\n", m,
340                 m->mux_list.prev, m->mux_list.next);
341         v9fs_mux_cancel(m, -ECONNRESET);
342
343         if (!list_empty(&m->req_list)) {
344                 /* wait until all processes waiting on this session exit */
345                 dprintk(DEBUG_MUX, "mux %p waiting for empty request queue\n",
346                         m);
347                 wait_event_timeout(m->equeue, (list_empty(&m->req_list)), 5000);
348                 dprintk(DEBUG_MUX, "mux %p request queue empty: %d\n", m,
349                         list_empty(&m->req_list));
350         }
351
352         v9fs_mux_poll_stop(m);
353         m->trans = NULL;
354
355         kfree(m);
356 }
357
358 /**
359  * v9fs_pollwait - called by files poll operation to add v9fs-poll task
360  *      to files wait queue
361  */
362 static void
363 v9fs_pollwait(struct file *filp, wait_queue_head_t * wait_address,
364               poll_table * p)
365 {
366         int i;
367         struct v9fs_mux_data *m;
368
369         m = container_of(p, struct v9fs_mux_data, pt);
370         for(i = 0; i < ARRAY_SIZE(m->poll_waddr); i++)
371                 if (m->poll_waddr[i] == NULL)
372                         break;
373
374         if (i >= ARRAY_SIZE(m->poll_waddr)) {
375                 dprintk(DEBUG_ERROR, "not enough wait_address slots\n");
376                 return;
377         }
378
379         m->poll_waddr[i] = wait_address;
380
381         if (!wait_address) {
382                 dprintk(DEBUG_ERROR, "no wait_address\n");
383                 m->poll_waddr[i] = ERR_PTR(-EIO);
384                 return;
385         }
386
387         init_waitqueue_entry(&m->poll_wait[i], m->poll_task->task);
388         add_wait_queue(wait_address, &m->poll_wait[i]);
389 }
390
391 /**
392  * v9fs_poll_mux - polls a mux and schedules read or write works if necessary
393  */
394 static void v9fs_poll_mux(struct v9fs_mux_data *m)
395 {
396         int n;
397
398         if (m->err < 0)
399                 return;
400
401         n = m->trans->poll(m->trans, NULL);
402         if (n < 0 || n & (POLLERR | POLLHUP | POLLNVAL)) {
403                 dprintk(DEBUG_MUX, "error mux %p err %d\n", m, n);
404                 if (n >= 0)
405                         n = -ECONNRESET;
406                 v9fs_mux_cancel(m, n);
407         }
408
409         if (n & POLLIN) {
410                 set_bit(Rpending, &m->wsched);
411                 dprintk(DEBUG_MUX, "mux %p can read\n", m);
412                 if (!test_and_set_bit(Rworksched, &m->wsched)) {
413                         dprintk(DEBUG_MUX, "schedule read work mux %p\n", m);
414                         queue_work(v9fs_mux_wq, &m->rq);
415                 }
416         }
417
418         if (n & POLLOUT) {
419                 set_bit(Wpending, &m->wsched);
420                 dprintk(DEBUG_MUX, "mux %p can write\n", m);
421                 if ((m->wsize || !list_empty(&m->unsent_req_list))
422                     && !test_and_set_bit(Wworksched, &m->wsched)) {
423                         dprintk(DEBUG_MUX, "schedule write work mux %p\n", m);
424                         queue_work(v9fs_mux_wq, &m->wq);
425                 }
426         }
427 }
428
429 /**
430  * v9fs_poll_proc - polls all v9fs transports for new events and queues
431  *      the appropriate work to the work queue
432  */
433 static int v9fs_poll_proc(void *a)
434 {
435         struct v9fs_mux_data *m, *mtmp;
436         struct v9fs_mux_poll_task *vpt;
437
438         vpt = a;
439         dprintk(DEBUG_MUX, "start %p %p\n", current, vpt);
440         allow_signal(SIGKILL);
441         while (!kthread_should_stop()) {
442                 set_current_state(TASK_INTERRUPTIBLE);
443                 if (signal_pending(current))
444                         break;
445
446                 list_for_each_entry_safe(m, mtmp, &vpt->mux_list, mux_list) {
447                         v9fs_poll_mux(m);
448                 }
449
450                 dprintk(DEBUG_MUX, "sleeping...\n");
451                 schedule_timeout(SCHED_TIMEOUT * HZ);
452         }
453
454         __set_current_state(TASK_RUNNING);
455         dprintk(DEBUG_MUX, "finish\n");
456         return 0;
457 }
458
459 /**
460  * v9fs_write_work - called when a transport can send some data
461  */
462 static void v9fs_write_work(void *a)
463 {
464         int n, err;
465         struct v9fs_mux_data *m;
466         struct v9fs_req *req;
467
468         m = a;
469
470         if (m->err < 0) {
471                 clear_bit(Wworksched, &m->wsched);
472                 return;
473         }
474
475         if (!m->wsize) {
476                 if (list_empty(&m->unsent_req_list)) {
477                         clear_bit(Wworksched, &m->wsched);
478                         return;
479                 }
480
481                 spin_lock(&m->lock);
482 again:
483                 req = list_entry(m->unsent_req_list.next, struct v9fs_req,
484                                req_list);
485                 list_move_tail(&req->req_list, &m->req_list);
486                 if (req->err == ERREQFLUSH)
487                         goto again;
488
489                 m->wbuf = req->tcall->sdata;
490                 m->wsize = req->tcall->size;
491                 m->wpos = 0;
492                 dump_data(m->wbuf, m->wsize);
493                 spin_unlock(&m->lock);
494         }
495
496         dprintk(DEBUG_MUX, "mux %p pos %d size %d\n", m, m->wpos, m->wsize);
497         clear_bit(Wpending, &m->wsched);
498         err = m->trans->write(m->trans, m->wbuf + m->wpos, m->wsize - m->wpos);
499         dprintk(DEBUG_MUX, "mux %p sent %d bytes\n", m, err);
500         if (err == -EAGAIN) {
501                 clear_bit(Wworksched, &m->wsched);
502                 return;
503         }
504
505         if (err <= 0)
506                 goto error;
507
508         m->wpos += err;
509         if (m->wpos == m->wsize)
510                 m->wpos = m->wsize = 0;
511
512         if (m->wsize == 0 && !list_empty(&m->unsent_req_list)) {
513                 if (test_and_clear_bit(Wpending, &m->wsched))
514                         n = POLLOUT;
515                 else
516                         n = m->trans->poll(m->trans, NULL);
517
518                 if (n & POLLOUT) {
519                         dprintk(DEBUG_MUX, "schedule write work mux %p\n", m);
520                         queue_work(v9fs_mux_wq, &m->wq);
521                 } else
522                         clear_bit(Wworksched, &m->wsched);
523         } else
524                 clear_bit(Wworksched, &m->wsched);
525
526         return;
527
528       error:
529         v9fs_mux_cancel(m, err);
530         clear_bit(Wworksched, &m->wsched);
531 }
532
533 static void process_request(struct v9fs_mux_data *m, struct v9fs_req *req)
534 {
535         int ecode;
536         struct v9fs_str *ename;
537
538         if (!req->err && req->rcall->id == RERROR) {
539                 ecode = req->rcall->params.rerror.errno;
540                 ename = &req->rcall->params.rerror.error;
541
542                 dprintk(DEBUG_MUX, "Rerror %.*s\n", ename->len, ename->str);
543
544                 if (*m->extended)
545                         req->err = -ecode;
546
547                 if (!req->err) {
548                         req->err = v9fs_errstr2errno(ename->str, ename->len);
549
550                         if (!req->err) {        /* string match failed */
551                                 PRINT_FCALL_ERROR("unknown error", req->rcall);
552                         }
553
554                         if (!req->err)
555                                 req->err = -ESERVERFAULT;
556                 }
557         } else if (req->tcall && req->rcall->id != req->tcall->id + 1) {
558                 dprintk(DEBUG_ERROR, "fcall mismatch: expected %d, got %d\n",
559                         req->tcall->id + 1, req->rcall->id);
560                 if (!req->err)
561                         req->err = -EIO;
562         }
563 }
564
565 /**
566  * v9fs_read_work - called when there is some data to be read from a transport
567  */
568 static void v9fs_read_work(void *a)
569 {
570         int n, err;
571         struct v9fs_mux_data *m;
572         struct v9fs_req *req, *rptr, *rreq;
573         struct v9fs_fcall *rcall;
574         char *rbuf;
575
576         m = a;
577
578         if (m->err < 0)
579                 return;
580
581         rcall = NULL;
582         dprintk(DEBUG_MUX, "start mux %p pos %d\n", m, m->rpos);
583
584         if (!m->rcall) {
585                 m->rcall =
586                     kmalloc(sizeof(struct v9fs_fcall) + m->msize, GFP_KERNEL);
587                 if (!m->rcall) {
588                         err = -ENOMEM;
589                         goto error;
590                 }
591
592                 m->rbuf = (char *)m->rcall + sizeof(struct v9fs_fcall);
593                 m->rpos = 0;
594         }
595
596         clear_bit(Rpending, &m->wsched);
597         err = m->trans->read(m->trans, m->rbuf + m->rpos, m->msize - m->rpos);
598         dprintk(DEBUG_MUX, "mux %p got %d bytes\n", m, err);
599         if (err == -EAGAIN) {
600                 clear_bit(Rworksched, &m->wsched);
601                 return;
602         }
603
604         if (err <= 0)
605                 goto error;
606
607         m->rpos += err;
608         while (m->rpos > 4) {
609                 n = le32_to_cpu(*(__le32 *) m->rbuf);
610                 if (n >= m->msize) {
611                         dprintk(DEBUG_ERROR,
612                                 "requested packet size too big: %d\n", n);
613                         err = -EIO;
614                         goto error;
615                 }
616
617                 if (m->rpos < n)
618                         break;
619
620                 dump_data(m->rbuf, n);
621                 err =
622                     v9fs_deserialize_fcall(m->rbuf, n, m->rcall, *m->extended);
623                 if (err < 0) {
624                         goto error;
625                 }
626
627                 if ((v9fs_debug_level&DEBUG_FCALL) == DEBUG_FCALL) {
628                         char buf[150];
629
630                         v9fs_printfcall(buf, sizeof(buf), m->rcall,
631                                 *m->extended);
632                         printk(KERN_NOTICE ">>> %p %s\n", m, buf);
633                 }
634
635                 rcall = m->rcall;
636                 rbuf = m->rbuf;
637                 if (m->rpos > n) {
638                         m->rcall = kmalloc(sizeof(struct v9fs_fcall) + m->msize,
639                                            GFP_KERNEL);
640                         if (!m->rcall) {
641                                 err = -ENOMEM;
642                                 goto error;
643                         }
644
645                         m->rbuf = (char *)m->rcall + sizeof(struct v9fs_fcall);
646                         memmove(m->rbuf, rbuf + n, m->rpos - n);
647                         m->rpos -= n;
648                 } else {
649                         m->rcall = NULL;
650                         m->rbuf = NULL;
651                         m->rpos = 0;
652                 }
653
654                 dprintk(DEBUG_MUX, "mux %p fcall id %d tag %d\n", m, rcall->id,
655                         rcall->tag);
656
657                 req = NULL;
658                 spin_lock(&m->lock);
659                 list_for_each_entry_safe(rreq, rptr, &m->req_list, req_list) {
660                         if (rreq->tag == rcall->tag) {
661                                 req = rreq;
662                                 if (req->flush != Flushing)
663                                         list_del(&req->req_list);
664                                 break;
665                         }
666                 }
667                 spin_unlock(&m->lock);
668
669                 if (req) {
670                         req->rcall = rcall;
671                         process_request(m, req);
672
673                         if (req->flush != Flushing) {
674                                 if (req->cb)
675                                         (*req->cb) (req, req->cba);
676                                 else
677                                         kfree(req->rcall);
678
679                                 wake_up(&m->equeue);
680                         }
681                 } else {
682                         if (err >= 0 && rcall->id != RFLUSH)
683                                 dprintk(DEBUG_ERROR,
684                                         "unexpected response mux %p id %d tag %d\n",
685                                         m, rcall->id, rcall->tag);
686                         kfree(rcall);
687                 }
688         }
689
690         if (!list_empty(&m->req_list)) {
691                 if (test_and_clear_bit(Rpending, &m->wsched))
692                         n = POLLIN;
693                 else
694                         n = m->trans->poll(m->trans, NULL);
695
696                 if (n & POLLIN) {
697                         dprintk(DEBUG_MUX, "schedule read work mux %p\n", m);
698                         queue_work(v9fs_mux_wq, &m->rq);
699                 } else
700                         clear_bit(Rworksched, &m->wsched);
701         } else
702                 clear_bit(Rworksched, &m->wsched);
703
704         return;
705
706       error:
707         v9fs_mux_cancel(m, err);
708         clear_bit(Rworksched, &m->wsched);
709 }
710
711 /**
712  * v9fs_send_request - send 9P request
713  * The function can sleep until the request is scheduled for sending.
714  * The function can be interrupted. Return from the function is not
715  * a guarantee that the request is sent succesfully. Can return errors
716  * that can be retrieved by PTR_ERR macros.
717  *
718  * @m: mux data
719  * @tc: request to be sent
720  * @cb: callback function to call when response is received
721  * @cba: parameter to pass to the callback function
722  */
723 static struct v9fs_req *v9fs_send_request(struct v9fs_mux_data *m,
724                                           struct v9fs_fcall *tc,
725                                           v9fs_mux_req_callback cb, void *cba)
726 {
727         int n;
728         struct v9fs_req *req;
729
730         dprintk(DEBUG_MUX, "mux %p task %p tcall %p id %d\n", m, current,
731                 tc, tc->id);
732         if (m->err < 0)
733                 return ERR_PTR(m->err);
734
735         req = kmalloc(sizeof(struct v9fs_req), GFP_KERNEL);
736         if (!req)
737                 return ERR_PTR(-ENOMEM);
738
739         if (tc->id == TVERSION)
740                 n = V9FS_NOTAG;
741         else
742                 n = v9fs_mux_get_tag(m);
743
744         if (n < 0)
745                 return ERR_PTR(-ENOMEM);
746
747         v9fs_set_tag(tc, n);
748         if ((v9fs_debug_level&DEBUG_FCALL) == DEBUG_FCALL) {
749                 char buf[150];
750
751                 v9fs_printfcall(buf, sizeof(buf), tc, *m->extended);
752                 printk(KERN_NOTICE "<<< %p %s\n", m, buf);
753         }
754
755         spin_lock_init(&req->lock);
756         req->tag = n;
757         req->tcall = tc;
758         req->rcall = NULL;
759         req->err = 0;
760         req->cb = cb;
761         req->cba = cba;
762         req->flush = None;
763
764         spin_lock(&m->lock);
765         list_add_tail(&req->req_list, &m->unsent_req_list);
766         spin_unlock(&m->lock);
767
768         if (test_and_clear_bit(Wpending, &m->wsched))
769                 n = POLLOUT;
770         else
771                 n = m->trans->poll(m->trans, NULL);
772
773         if (n & POLLOUT && !test_and_set_bit(Wworksched, &m->wsched))
774                 queue_work(v9fs_mux_wq, &m->wq);
775
776         return req;
777 }
778
779 static void v9fs_mux_free_request(struct v9fs_mux_data *m, struct v9fs_req *req)
780 {
781         v9fs_mux_put_tag(m, req->tag);
782         kfree(req);
783 }
784
785 static void v9fs_mux_flush_cb(struct v9fs_req *freq, void *a)
786 {
787         v9fs_mux_req_callback cb;
788         int tag;
789         struct v9fs_mux_data *m;
790         struct v9fs_req *req, *rreq, *rptr;
791
792         m = a;
793         dprintk(DEBUG_MUX, "mux %p tc %p rc %p err %d oldtag %d\n", m,
794                 freq->tcall, freq->rcall, freq->err,
795                 freq->tcall->params.tflush.oldtag);
796
797         spin_lock(&m->lock);
798         cb = NULL;
799         tag = freq->tcall->params.tflush.oldtag;
800         req = NULL;
801         list_for_each_entry_safe(rreq, rptr, &m->req_list, req_list) {
802                 if (rreq->tag == tag) {
803                         req = rreq;
804                         list_del(&req->req_list);
805                         break;
806                 }
807         }
808         spin_unlock(&m->lock);
809
810         if (req) {
811                 spin_lock(&req->lock);
812                 req->flush = Flushed;
813                 spin_unlock(&req->lock);
814
815                 if (req->cb)
816                         (*req->cb) (req, req->cba);
817                 else
818                         kfree(req->rcall);
819
820                 wake_up(&m->equeue);
821         }
822
823         kfree(freq->tcall);
824         kfree(freq->rcall);
825         v9fs_mux_free_request(m, freq);
826 }
827
828 static int
829 v9fs_mux_flush_request(struct v9fs_mux_data *m, struct v9fs_req *req)
830 {
831         struct v9fs_fcall *fc;
832         struct v9fs_req *rreq, *rptr;
833
834         dprintk(DEBUG_MUX, "mux %p req %p tag %d\n", m, req, req->tag);
835
836         /* if a response was received for a request, do nothing */
837         spin_lock(&req->lock);
838         if (req->rcall || req->err) {
839                 spin_unlock(&req->lock);
840                 dprintk(DEBUG_MUX, "mux %p req %p response already received\n", m, req);
841                 return 0;
842         }
843
844         req->flush = Flushing;
845         spin_unlock(&req->lock);
846
847         spin_lock(&m->lock);
848         /* if the request is not sent yet, just remove it from the list */
849         list_for_each_entry_safe(rreq, rptr, &m->unsent_req_list, req_list) {
850                 if (rreq->tag == req->tag) {
851                         dprintk(DEBUG_MUX, "mux %p req %p request is not sent yet\n", m, req);
852                         list_del(&rreq->req_list);
853                         req->flush = Flushed;
854                         spin_unlock(&m->lock);
855                         if (req->cb)
856                                 (*req->cb) (req, req->cba);
857                         return 0;
858                 }
859         }
860         spin_unlock(&m->lock);
861
862         clear_thread_flag(TIF_SIGPENDING);
863         fc = v9fs_create_tflush(req->tag);
864         v9fs_send_request(m, fc, v9fs_mux_flush_cb, m);
865         return 1;
866 }
867
868 static void
869 v9fs_mux_rpc_cb(struct v9fs_req *req, void *a)
870 {
871         struct v9fs_mux_rpc *r;
872
873         dprintk(DEBUG_MUX, "req %p r %p\n", req, a);
874         r = a;
875         r->rcall = req->rcall;
876         r->err = req->err;
877
878         if (req->flush!=None && !req->err)
879                 r->err = -ERESTARTSYS;
880
881         wake_up(&r->wqueue);
882 }
883
884 /**
885  * v9fs_mux_rpc - sends 9P request and waits until a response is available.
886  *      The function can be interrupted.
887  * @m: mux data
888  * @tc: request to be sent
889  * @rc: pointer where a pointer to the response is stored
890  */
891 int
892 v9fs_mux_rpc(struct v9fs_mux_data *m, struct v9fs_fcall *tc,
893              struct v9fs_fcall **rc)
894 {
895         int err, sigpending;
896         unsigned long flags;
897         struct v9fs_req *req;
898         struct v9fs_mux_rpc r;
899
900         r.err = 0;
901         r.tcall = tc;
902         r.rcall = NULL;
903         r.m = m;
904         init_waitqueue_head(&r.wqueue);
905
906         if (rc)
907                 *rc = NULL;
908
909         sigpending = 0;
910         if (signal_pending(current)) {
911                 sigpending = 1;
912                 clear_thread_flag(TIF_SIGPENDING);
913         }
914
915         req = v9fs_send_request(m, tc, v9fs_mux_rpc_cb, &r);
916         if (IS_ERR(req)) {
917                 err = PTR_ERR(req);
918                 dprintk(DEBUG_MUX, "error %d\n", err);
919                 return err;
920         }
921
922         err = wait_event_interruptible(r.wqueue, r.rcall != NULL || r.err < 0);
923         if (r.err < 0)
924                 err = r.err;
925
926         if (err == -ERESTARTSYS && m->trans->status == Connected && m->err == 0) {
927                 if (v9fs_mux_flush_request(m, req)) {
928                         /* wait until we get response of the flush message */
929                         do {
930                                 clear_thread_flag(TIF_SIGPENDING);
931                                 err = wait_event_interruptible(r.wqueue,
932                                         r.rcall || r.err);
933                         } while (!r.rcall && !r.err && err==-ERESTARTSYS &&
934                                 m->trans->status==Connected && !m->err);
935                 }
936                 sigpending = 1;
937         }
938
939         if (sigpending) {
940                 spin_lock_irqsave(&current->sighand->siglock, flags);
941                 recalc_sigpending();
942                 spin_unlock_irqrestore(&current->sighand->siglock, flags);
943         }
944
945         if (rc)
946                 *rc = r.rcall;
947         else
948                 kfree(r.rcall);
949
950         v9fs_mux_free_request(m, req);
951         if (err > 0)
952                 err = -EIO;
953
954         return err;
955 }
956
957 #if 0
958 /**
959  * v9fs_mux_rpcnb - sends 9P request without waiting for response.
960  * @m: mux data
961  * @tc: request to be sent
962  * @cb: callback function to be called when response arrives
963  * @cba: value to pass to the callback function
964  */
965 int v9fs_mux_rpcnb(struct v9fs_mux_data *m, struct v9fs_fcall *tc,
966                    v9fs_mux_req_callback cb, void *a)
967 {
968         int err;
969         struct v9fs_req *req;
970
971         req = v9fs_send_request(m, tc, cb, a);
972         if (IS_ERR(req)) {
973                 err = PTR_ERR(req);
974                 dprintk(DEBUG_MUX, "error %d\n", err);
975                 return PTR_ERR(req);
976         }
977
978         dprintk(DEBUG_MUX, "mux %p tc %p tag %d\n", m, tc, req->tag);
979         return 0;
980 }
981 #endif  /*  0  */
982
983 /**
984  * v9fs_mux_cancel - cancel all pending requests with error
985  * @m: mux data
986  * @err: error code
987  */
988 void v9fs_mux_cancel(struct v9fs_mux_data *m, int err)
989 {
990         struct v9fs_req *req, *rtmp;
991         LIST_HEAD(cancel_list);
992
993         dprintk(DEBUG_ERROR, "mux %p err %d\n", m, err);
994         m->err = err;
995         spin_lock(&m->lock);
996         list_for_each_entry_safe(req, rtmp, &m->req_list, req_list) {
997                 list_move(&req->req_list, &cancel_list);
998         }
999         list_for_each_entry_safe(req, rtmp, &m->unsent_req_list, req_list) {
1000                 list_move(&req->req_list, &cancel_list);
1001         }
1002         spin_unlock(&m->lock);
1003
1004         list_for_each_entry_safe(req, rtmp, &cancel_list, req_list) {
1005                 list_del(&req->req_list);
1006                 if (!req->err)
1007                         req->err = err;
1008
1009                 if (req->cb)
1010                         (*req->cb) (req, req->cba);
1011                 else
1012                         kfree(req->rcall);
1013         }
1014
1015         wake_up(&m->equeue);
1016 }
1017
1018 static u16 v9fs_mux_get_tag(struct v9fs_mux_data *m)
1019 {
1020         int tag;
1021
1022         tag = v9fs_get_idpool(&m->tagpool);
1023         if (tag < 0)
1024                 return V9FS_NOTAG;
1025         else
1026                 return (u16) tag;
1027 }
1028
1029 static void v9fs_mux_put_tag(struct v9fs_mux_data *m, u16 tag)
1030 {
1031         if (tag != V9FS_NOTAG && v9fs_check_idpool(tag, &m->tagpool))
1032                 v9fs_put_idpool(tag, &m->tagpool);
1033 }