]> bbs.cooldavid.org Git - net-next-2.6.git/blob - fs/9p/mux.c
[PATCH] v9fs: new multiplexer implementation
[net-next-2.6.git] / fs / 9p / mux.c
1 /*
2  * linux/fs/9p/mux.c
3  *
4  * Protocol Multiplexer
5  *
6  *  Copyright (C) 2004 by Eric Van Hensbergen <ericvh@gmail.com>
7  *  Copyright (C) 2004-2005 by Latchesar Ionkov <lucho@ionkov.net>
8  *
9  *  This program is free software; you can redistribute it and/or modify
10  *  it under the terms of the GNU General Public License as published by
11  *  the Free Software Foundation; either version 2 of the License, or
12  *  (at your option) any later version.
13  *
14  *  This program is distributed in the hope that it will be useful,
15  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *  GNU General Public License for more details.
18  *
19  *  You should have received a copy of the GNU General Public License
20  *  along with this program; if not, write to:
21  *  Free Software Foundation
22  *  51 Franklin Street, Fifth Floor
23  *  Boston, MA  02111-1301  USA
24  *
25  */
26
27 #include <linux/config.h>
28 #include <linux/module.h>
29 #include <linux/errno.h>
30 #include <linux/fs.h>
31 #include <linux/poll.h>
32 #include <linux/kthread.h>
33 #include <linux/idr.h>
34
35 #include "debug.h"
36 #include "v9fs.h"
37 #include "9p.h"
38 #include "transport.h"
39 #include "conv.h"
40 #include "mux.h"
41
42 #define ERREQFLUSH      1
43 #define SCHED_TIMEOUT   10
44 #define MAXPOLLWADDR    2
45
46 enum {
47         Rworksched = 1,         /* read work scheduled or running */
48         Rpending = 2,           /* can read */
49         Wworksched = 4,         /* write work scheduled or running */
50         Wpending = 8,           /* can write */
51 };
52
53 struct v9fs_mux_poll_task;
54
55 struct v9fs_req {
56         int tag;
57         struct v9fs_fcall *tcall;
58         struct v9fs_fcall *rcall;
59         int err;
60         v9fs_mux_req_callback cb;
61         void *cba;
62         struct list_head req_list;
63 };
64
65 struct v9fs_mux_data {
66         spinlock_t lock;
67         struct list_head mux_list;
68         struct v9fs_mux_poll_task *poll_task;
69         int msize;
70         unsigned char *extended;
71         struct v9fs_transport *trans;
72         struct v9fs_idpool tidpool;
73         int err;
74         wait_queue_head_t equeue;
75         struct list_head req_list;
76         struct list_head unsent_req_list;
77         int rpos;
78         char *rbuf;
79         int wpos;
80         int wsize;
81         char *wbuf;
82         wait_queue_t poll_wait[MAXPOLLWADDR];
83         wait_queue_head_t *poll_waddr[MAXPOLLWADDR];
84         poll_table pt;
85         struct work_struct rq;
86         struct work_struct wq;
87         unsigned long wsched;
88 };
89
90 struct v9fs_mux_poll_task {
91         struct task_struct *task;
92         struct list_head mux_list;
93         int muxnum;
94 };
95
96 struct v9fs_mux_rpc {
97         struct v9fs_mux_data *m;
98         struct v9fs_req *req;
99         int err;
100         struct v9fs_fcall *rcall;
101         wait_queue_head_t wqueue;
102 };
103
104 static int v9fs_poll_proc(void *);
105 static void v9fs_read_work(void *);
106 static void v9fs_write_work(void *);
107 static void v9fs_pollwait(struct file *filp, wait_queue_head_t * wait_address,
108                           poll_table * p);
109
110 static DECLARE_MUTEX(v9fs_mux_task_lock);
111 static struct workqueue_struct *v9fs_mux_wq;
112
113 static int v9fs_mux_num;
114 static int v9fs_mux_poll_task_num;
115 static struct v9fs_mux_poll_task v9fs_mux_poll_tasks[100];
116
117 void v9fs_mux_global_init(void)
118 {
119         int i;
120
121         for (i = 0; i < ARRAY_SIZE(v9fs_mux_poll_tasks); i++)
122                 v9fs_mux_poll_tasks[i].task = NULL;
123
124         v9fs_mux_wq = create_workqueue("v9fs");
125 }
126
127 void v9fs_mux_global_exit(void)
128 {
129         destroy_workqueue(v9fs_mux_wq);
130 }
131
132 /**
133  * v9fs_mux_calc_poll_procs - calculates the number of polling procs
134  * based on the number of mounted v9fs filesystems.
135  *
136  * The current implementation returns sqrt of the number of mounts.
137  */
138 inline int v9fs_mux_calc_poll_procs(int muxnum)
139 {
140         int n;
141
142         if (v9fs_mux_poll_task_num)
143                 n = muxnum / v9fs_mux_poll_task_num +
144                     (muxnum % v9fs_mux_poll_task_num ? 1 : 0);
145         else
146                 n = 1;
147
148         if (n > ARRAY_SIZE(v9fs_mux_poll_tasks))
149                 n = ARRAY_SIZE(v9fs_mux_poll_tasks);
150
151         return n;
152 }
153
154 static void v9fs_mux_poll_start(struct v9fs_mux_data *m)
155 {
156         int i, n;
157         struct v9fs_mux_poll_task *vpt, *vptlast;
158
159         dprintk(DEBUG_MUX, "mux %p muxnum %d procnum %d\n", m, v9fs_mux_num,
160                 v9fs_mux_poll_task_num);
161         up(&v9fs_mux_task_lock);
162
163         n = v9fs_mux_calc_poll_procs(v9fs_mux_num + 1);
164         if (n > v9fs_mux_poll_task_num) {
165                 for (i = 0; i < ARRAY_SIZE(v9fs_mux_poll_tasks); i++) {
166                         if (v9fs_mux_poll_tasks[i].task == NULL) {
167                                 vpt = &v9fs_mux_poll_tasks[i];
168                                 dprintk(DEBUG_MUX, "create proc %p\n", vpt);
169                                 vpt->task = kthread_create(v9fs_poll_proc,
170                                         vpt, "v9fs-poll");
171                                 INIT_LIST_HEAD(&vpt->mux_list);
172                                 vpt->muxnum = 0;
173                                 v9fs_mux_poll_task_num++;
174                                 wake_up_process(vpt->task);
175                                 break;
176                         }
177                 }
178
179                 if (i >= ARRAY_SIZE(v9fs_mux_poll_tasks))
180                         dprintk(DEBUG_ERROR, "warning: no free poll slots\n");
181         }
182
183         n = (v9fs_mux_num + 1) / v9fs_mux_poll_task_num +
184             ((v9fs_mux_num + 1) % v9fs_mux_poll_task_num ? 1 : 0);
185
186         vptlast = NULL;
187         for (i = 0; i < ARRAY_SIZE(v9fs_mux_poll_tasks); i++) {
188                 vpt = &v9fs_mux_poll_tasks[i];
189                 if (vpt->task != NULL) {
190                         vptlast = vpt;
191                         if (vpt->muxnum < n) {
192                                 dprintk(DEBUG_MUX, "put in proc %d\n", i);
193                                 list_add(&m->mux_list, &vpt->mux_list);
194                                 vpt->muxnum++;
195                                 m->poll_task = vpt;
196                                 memset(&m->poll_waddr, 0, sizeof(m->poll_waddr));
197                                 init_poll_funcptr(&m->pt, v9fs_pollwait);
198                                 break;
199                         }
200                 }
201         }
202
203         if (i >= ARRAY_SIZE(v9fs_mux_poll_tasks)) {
204                 dprintk(DEBUG_MUX, "put in proc %d\n", i);
205                 list_add(&m->mux_list, &vptlast->mux_list);
206                 vptlast->muxnum++;
207                 m->poll_task = vpt;
208                 memset(&m->poll_waddr, 0, sizeof(m->poll_waddr));
209                 init_poll_funcptr(&m->pt, v9fs_pollwait);
210         }
211
212         v9fs_mux_num++;
213         down(&v9fs_mux_task_lock);
214 }
215
216 static void v9fs_mux_poll_stop(struct v9fs_mux_data *m)
217 {
218         int i;
219         struct v9fs_mux_poll_task *vpt;
220
221         up(&v9fs_mux_task_lock);
222         vpt = m->poll_task;
223         list_del(&m->mux_list);
224         for(i = 0; i < ARRAY_SIZE(m->poll_waddr); i++) {
225                 if (m->poll_waddr[i] != NULL) {
226                         remove_wait_queue(m->poll_waddr[i], &m->poll_wait[i]);
227                         m->poll_waddr[i] = NULL;
228                 }
229         }
230         vpt->muxnum--;
231         if (!vpt->muxnum) {
232                 dprintk(DEBUG_MUX, "destroy proc %p\n", vpt);
233                 send_sig(SIGKILL, vpt->task, 1);
234                 vpt->task = NULL;
235                 v9fs_mux_poll_task_num--;
236         }
237         v9fs_mux_num--;
238         down(&v9fs_mux_task_lock);
239 }
240
241 /**
242  * v9fs_mux_init - allocate and initialize the per-session mux data
243  * Creates the polling task if this is the first session.
244  *
245  * @trans - transport structure
246  * @msize - maximum message size
247  * @extended - pointer to the extended flag
248  */
249 struct v9fs_mux_data *v9fs_mux_init(struct v9fs_transport *trans, int msize,
250                                     unsigned char *extended)
251 {
252         int i, n;
253         struct v9fs_mux_data *m, *mtmp;
254
255         dprintk(DEBUG_MUX, "transport %p msize %d\n", trans, msize);
256         m = kmalloc(sizeof(struct v9fs_mux_data) + 2 * msize, GFP_KERNEL);
257         if (!m)
258                 return ERR_PTR(-ENOMEM);
259
260         spin_lock_init(&m->lock);
261         INIT_LIST_HEAD(&m->mux_list);
262         m->msize = msize;
263         m->extended = extended;
264         m->trans = trans;
265         idr_init(&m->tidpool.pool);
266         init_MUTEX(&m->tidpool.lock);
267         m->err = 0;
268         init_waitqueue_head(&m->equeue);
269         INIT_LIST_HEAD(&m->req_list);
270         INIT_LIST_HEAD(&m->unsent_req_list);
271         m->rpos = 0;
272         m->rbuf = (char *)m + sizeof(struct v9fs_mux_data);
273         m->wpos = m->wsize = 0;
274         m->wbuf = m->rbuf + msize;
275         INIT_WORK(&m->rq, v9fs_read_work, m);
276         INIT_WORK(&m->wq, v9fs_write_work, m);
277         m->wsched = 0;
278         memset(&m->poll_waddr, 0, sizeof(m->poll_waddr));
279         v9fs_mux_poll_start(m);
280
281         n = trans->poll(trans, &m->pt);
282         if (n & POLLIN) {
283                 dprintk(DEBUG_MUX, "mux %p can read\n", m);
284                 set_bit(Rpending, &m->wsched);
285         }
286
287         if (n & POLLOUT) {
288                 dprintk(DEBUG_MUX, "mux %p can write\n", m);
289                 set_bit(Wpending, &m->wsched);
290         }
291
292         for(i = 0; i < ARRAY_SIZE(m->poll_waddr); i++) {
293                 if (IS_ERR(m->poll_waddr[i])) {
294                         v9fs_mux_poll_stop(m);
295                         mtmp = (void *)m->poll_waddr;   /* the error code */
296                         kfree(m);
297                         m = mtmp;
298                         break;
299                 }
300         }
301
302         return m;
303 }
304
305 /**
306  * v9fs_mux_destroy - cancels all pending requests and frees mux resources
307  */
308 void v9fs_mux_destroy(struct v9fs_mux_data *m)
309 {
310         dprintk(DEBUG_MUX, "mux %p prev %p next %p\n", m,
311                 m->mux_list.prev, m->mux_list.next);
312         v9fs_mux_cancel(m, -ECONNRESET);
313
314         if (!list_empty(&m->req_list)) {
315                 /* wait until all processes waiting on this session exit */
316                 dprintk(DEBUG_MUX, "mux %p waiting for empty request queue\n",
317                         m);
318                 wait_event_timeout(m->equeue, (list_empty(&m->req_list)), 5000);
319                 dprintk(DEBUG_MUX, "mux %p request queue empty: %d\n", m,
320                         list_empty(&m->req_list));
321         }
322
323         v9fs_mux_poll_stop(m);
324         m->trans = NULL;
325
326         kfree(m);
327 }
328
329 /**
330  * v9fs_pollwait - called by files poll operation to add v9fs-poll task
331  *      to files wait queue
332  */
333 static void
334 v9fs_pollwait(struct file *filp, wait_queue_head_t * wait_address,
335               poll_table * p)
336 {
337         int i;
338         struct v9fs_mux_data *m;
339
340         m = container_of(p, struct v9fs_mux_data, pt);
341         for(i = 0; i < ARRAY_SIZE(m->poll_waddr); i++)
342                 if (m->poll_waddr[i] == NULL)
343                         break;
344
345         if (i >= ARRAY_SIZE(m->poll_waddr)) {
346                 dprintk(DEBUG_ERROR, "not enough wait_address slots\n");
347                 return;
348         }
349
350         m->poll_waddr[i] = wait_address;
351
352         if (!wait_address) {
353                 dprintk(DEBUG_ERROR, "no wait_address\n");
354                 m->poll_waddr[i] = ERR_PTR(-EIO);
355                 return;
356         }
357
358         init_waitqueue_entry(&m->poll_wait[i], m->poll_task->task);
359         add_wait_queue(wait_address, &m->poll_wait[i]);
360 }
361
362 /**
363  * v9fs_poll_mux - polls a mux and schedules read or write works if necessary
364  */
365 static inline void v9fs_poll_mux(struct v9fs_mux_data *m)
366 {
367         int n;
368
369         if (m->err < 0)
370                 return;
371
372         n = m->trans->poll(m->trans, NULL);
373         if (n < 0 || n & (POLLERR | POLLHUP | POLLNVAL)) {
374                 dprintk(DEBUG_MUX, "error mux %p err %d\n", m, n);
375                 if (n >= 0)
376                         n = -ECONNRESET;
377                 v9fs_mux_cancel(m, n);
378         }
379
380         if (n & POLLIN) {
381                 set_bit(Rpending, &m->wsched);
382                 dprintk(DEBUG_MUX, "mux %p can read\n", m);
383                 if (!test_and_set_bit(Rworksched, &m->wsched)) {
384                         dprintk(DEBUG_MUX, "schedule read work mux %p\n", m);
385                         queue_work(v9fs_mux_wq, &m->rq);
386                 }
387         }
388
389         if (n & POLLOUT) {
390                 set_bit(Wpending, &m->wsched);
391                 dprintk(DEBUG_MUX, "mux %p can write\n", m);
392                 if ((m->wsize || !list_empty(&m->unsent_req_list))
393                     && !test_and_set_bit(Wworksched, &m->wsched)) {
394                         dprintk(DEBUG_MUX, "schedule write work mux %p\n", m);
395                         queue_work(v9fs_mux_wq, &m->wq);
396                 }
397         }
398 }
399
400 /**
401  * v9fs_poll_proc - polls all v9fs transports for new events and queues
402  *      the appropriate work to the work queue
403  */
404 static int v9fs_poll_proc(void *a)
405 {
406         struct v9fs_mux_data *m, *mtmp;
407         struct v9fs_mux_poll_task *vpt;
408
409         vpt = a;
410         dprintk(DEBUG_MUX, "start %p %p\n", current, vpt);
411         allow_signal(SIGKILL);
412         while (!kthread_should_stop()) {
413                 set_current_state(TASK_INTERRUPTIBLE);
414                 if (signal_pending(current))
415                         break;
416
417                 list_for_each_entry_safe(m, mtmp, &vpt->mux_list, mux_list) {
418                         v9fs_poll_mux(m);
419                 }
420
421                 dprintk(DEBUG_MUX, "sleeping...\n");
422                 schedule_timeout(SCHED_TIMEOUT * HZ);
423         }
424
425         __set_current_state(TASK_RUNNING);
426         dprintk(DEBUG_MUX, "finish\n");
427         return 0;
428 }
429
430 static inline int v9fs_write_req(struct v9fs_mux_data *m, struct v9fs_req *req)
431 {
432         int n;
433
434         list_move_tail(&req->req_list, &m->req_list);
435         n = v9fs_serialize_fcall(req->tcall, m->wbuf, m->msize, *m->extended);
436         if (n < 0) {
437                 req->err = n;
438                 list_del(&req->req_list);
439                 if (req->cb) {
440                         spin_unlock(&m->lock);
441                         (*req->cb) (req->cba, req->tcall, req->rcall, req->err);
442                         req->cb = NULL;
443                         spin_lock(&m->lock);
444                 } else
445                         kfree(req->rcall);
446
447                 kfree(req);
448         }
449
450         return n;
451 }
452
453 /**
454  * v9fs_write_work - called when a transport can send some data
455  */
456 static void v9fs_write_work(void *a)
457 {
458         int n, err;
459         struct v9fs_mux_data *m;
460         struct v9fs_req *req, *rtmp;
461
462         m = a;
463
464         if (m->err < 0) {
465                 clear_bit(Wworksched, &m->wsched);
466                 return;
467         }
468
469         if (!m->wsize) {
470                 if (list_empty(&m->unsent_req_list)) {
471                         clear_bit(Wworksched, &m->wsched);
472                         return;
473                 }
474
475                 err = 0;
476                 spin_lock(&m->lock);
477                 list_for_each_entry_safe(req, rtmp, &m->unsent_req_list,
478                                          req_list) {
479                         err = v9fs_write_req(m, req);
480                         if (err > 0)
481                                 break;
482                 }
483
484                 m->wsize = err;
485                 m->wpos = 0;
486                 spin_unlock(&m->lock);
487         }
488
489         dprintk(DEBUG_MUX, "mux %p pos %d size %d\n", m, m->wpos, m->wsize);
490         clear_bit(Wpending, &m->wsched);
491         err = m->trans->write(m->trans, m->wbuf + m->wpos, m->wsize - m->wpos);
492         dprintk(DEBUG_MUX, "mux %p sent %d bytes\n", m, err);
493         if (err == -EAGAIN) {
494                 clear_bit(Wworksched, &m->wsched);
495                 return;
496         }
497
498         if (err <= 0)
499                 goto error;
500
501         m->wpos += err;
502         if (m->wpos == m->wsize)
503                 m->wpos = m->wsize = 0;
504
505         if (m->wsize == 0 && !list_empty(&m->unsent_req_list)) {
506                 if (test_and_clear_bit(Wpending, &m->wsched))
507                         n = POLLOUT;
508                 else
509                         n = m->trans->poll(m->trans, NULL);
510
511                 if (n & POLLOUT) {
512                         dprintk(DEBUG_MUX, "schedule write work mux %p\n", m);
513                         queue_work(v9fs_mux_wq, &m->wq);
514                 } else
515                         clear_bit(Wworksched, &m->wsched);
516         } else
517                 clear_bit(Wworksched, &m->wsched);
518
519         return;
520
521       error:
522         v9fs_mux_cancel(m, err);
523         clear_bit(Wworksched, &m->wsched);
524 }
525
526 static void process_request(struct v9fs_mux_data *m, struct v9fs_req *req)
527 {
528         int ecode, tag;
529         char *ename;
530
531         tag = req->tag;
532         if (req->rcall->id == RERROR && !req->err) {
533                 ecode = req->rcall->params.rerror.errno;
534                 ename = req->rcall->params.rerror.error;
535
536                 dprintk(DEBUG_MUX, "Rerror %s\n", ename);
537
538                 if (*m->extended)
539                         req->err = -ecode;
540
541                 if (!req->err) {
542                         req->err = v9fs_errstr2errno(ename);
543
544                         if (!req->err) {        /* string match failed */
545                                 dprintk(DEBUG_ERROR, "unknown error: %s\n",
546                                         ename);
547                         }
548
549                         if (!req->err)
550                                 req->err = -ESERVERFAULT;
551                 }
552         } else if (req->tcall && req->rcall->id != req->tcall->id + 1) {
553                 dprintk(DEBUG_ERROR, "fcall mismatch: expected %d, got %d\n",
554                         req->tcall->id + 1, req->rcall->id);
555                 if (!req->err)
556                         req->err = -EIO;
557         }
558
559         if (req->cb && req->err != ERREQFLUSH) {
560                 dprintk(DEBUG_MUX, "calling callback tcall %p rcall %p\n",
561                         req->tcall, req->rcall);
562
563                 (*req->cb) (req->cba, req->tcall, req->rcall, req->err);
564                 req->cb = NULL;
565         } else
566                 kfree(req->rcall);
567
568         if (tag != V9FS_NOTAG)
569                 v9fs_put_idpool(tag, &m->tidpool);
570
571         wake_up(&m->equeue);
572         kfree(req);
573 }
574
575 /**
576  * v9fs_read_work - called when there is some data to be read from a transport
577  */
578 static void v9fs_read_work(void *a)
579 {
580         int n, err, rcallen;
581         struct v9fs_mux_data *m;
582         struct v9fs_req *req, *rptr, *rreq;
583         struct v9fs_fcall *rcall;
584
585         m = a;
586
587         if (m->err < 0)
588                 return;
589
590         rcall = NULL;
591         dprintk(DEBUG_MUX, "start mux %p pos %d\n", m, m->rpos);
592         clear_bit(Rpending, &m->wsched);
593         err = m->trans->read(m->trans, m->rbuf + m->rpos, m->msize - m->rpos);
594         dprintk(DEBUG_MUX, "mux %p got %d bytes\n", m, err);
595         if (err == -EAGAIN) {
596                 clear_bit(Rworksched, &m->wsched);
597                 return;
598         }
599
600         if (err <= 0)
601                 goto error;
602
603         m->rpos += err;
604         while (m->rpos > 4) {
605                 n = le32_to_cpu(*(__le32 *) m->rbuf);
606                 if (n >= m->msize) {
607                         dprintk(DEBUG_ERROR,
608                                 "requested packet size too big: %d\n", n);
609                         err = -EIO;
610                         goto error;
611                 }
612
613                 if (m->rpos < n)
614                         break;
615
616                 rcallen = n + V9FS_FCALLHDRSZ;
617                 rcall = kmalloc(rcallen, GFP_KERNEL);
618                 if (!rcall) {
619                         err = -ENOMEM;
620                         goto error;
621                 }
622
623                 dump_data(m->rbuf, n);
624                 err = v9fs_deserialize_fcall(m->rbuf, n, rcall, rcallen,
625                                              *m->extended);
626                 if (err < 0) {
627                         kfree(rcall);
628                         goto error;
629                 }
630
631                 dprintk(DEBUG_MUX, "mux %p fcall id %d tag %d\n", m, rcall->id,
632                         rcall->tag);
633
634                 req = NULL;
635                 spin_lock(&m->lock);
636                 list_for_each_entry_safe(rreq, rptr, &m->req_list, req_list) {
637                         if (rreq->tag == rcall->tag) {
638                                 req = rreq;
639                                 req->rcall = rcall;
640                                 list_del(&req->req_list);
641                                 spin_unlock(&m->lock);
642                                 process_request(m, req);
643                                 break;
644                         }
645                 }
646
647                 if (!req) {
648                         spin_unlock(&m->lock);
649                         if (err >= 0 && rcall->id != RFLUSH)
650                                 dprintk(DEBUG_ERROR,
651                                         "unexpected response mux %p id %d tag %d\n",
652                                         m, rcall->id, rcall->tag);
653                         kfree(rcall);
654                 }
655
656                 if (m->rpos > n)
657                         memmove(m->rbuf, m->rbuf + n, m->rpos - n);
658                 m->rpos -= n;
659         }
660
661         if (!list_empty(&m->req_list)) {
662                 if (test_and_clear_bit(Rpending, &m->wsched))
663                         n = POLLIN;
664                 else
665                         n = m->trans->poll(m->trans, NULL);
666
667                 if (n & POLLIN) {
668                         dprintk(DEBUG_MUX, "schedule read work mux %p\n", m);
669                         queue_work(v9fs_mux_wq, &m->rq);
670                 } else
671                         clear_bit(Rworksched, &m->wsched);
672         } else
673                 clear_bit(Rworksched, &m->wsched);
674
675         return;
676
677       error:
678         v9fs_mux_cancel(m, err);
679         clear_bit(Rworksched, &m->wsched);
680 }
681
682 /**
683  * v9fs_send_request - send 9P request
684  * The function can sleep until the request is scheduled for sending.
685  * The function can be interrupted. Return from the function is not
686  * a guarantee that the request is sent succesfully. Can return errors
687  * that can be retrieved by PTR_ERR macros.
688  *
689  * @m: mux data
690  * @tc: request to be sent
691  * @cb: callback function to call when response is received
692  * @cba: parameter to pass to the callback function
693  */
694 static struct v9fs_req *v9fs_send_request(struct v9fs_mux_data *m,
695                                           struct v9fs_fcall *tc,
696                                           v9fs_mux_req_callback cb, void *cba)
697 {
698         int n;
699         struct v9fs_req *req;
700
701         dprintk(DEBUG_MUX, "mux %p task %p tcall %p id %d\n", m, current,
702                 tc, tc->id);
703         if (m->err < 0)
704                 return ERR_PTR(m->err);
705
706         req = kmalloc(sizeof(struct v9fs_req), GFP_KERNEL);
707         if (!req)
708                 return ERR_PTR(-ENOMEM);
709
710         if (tc->id == TVERSION)
711                 n = V9FS_NOTAG;
712         else
713                 n = v9fs_get_idpool(&m->tidpool);
714
715         if (n < 0)
716                 return ERR_PTR(-ENOMEM);
717
718         tc->tag = n;
719         req->tag = n;
720         req->tcall = tc;
721         req->rcall = NULL;
722         req->err = 0;
723         req->cb = cb;
724         req->cba = cba;
725
726         spin_lock(&m->lock);
727         list_add_tail(&req->req_list, &m->unsent_req_list);
728         spin_unlock(&m->lock);
729
730         if (test_and_clear_bit(Wpending, &m->wsched))
731                 n = POLLOUT;
732         else
733                 n = m->trans->poll(m->trans, NULL);
734
735         if (n & POLLOUT && !test_and_set_bit(Wworksched, &m->wsched))
736                 queue_work(v9fs_mux_wq, &m->wq);
737
738         return req;
739 }
740
741 static inline void
742 v9fs_mux_flush_cb(void *a, struct v9fs_fcall *tc, struct v9fs_fcall *rc,
743                   int err)
744 {
745         v9fs_mux_req_callback cb;
746         int tag;
747         struct v9fs_mux_data *m;
748         struct v9fs_req *req, *rptr;
749
750         m = a;
751         dprintk(DEBUG_MUX, "mux %p tc %p rc %p err %d oldtag %d\n", m, tc,
752                 rc, err, tc->params.tflush.oldtag);
753
754         spin_lock(&m->lock);
755         cb = NULL;
756         tag = tc->params.tflush.oldtag;
757         list_for_each_entry_safe(req, rptr, &m->req_list, req_list) {
758                 if (req->tag == tag) {
759                         list_del(&req->req_list);
760                         if (req->cb) {
761                                 cb = req->cb;
762                                 req->cb = NULL;
763                                 spin_unlock(&m->lock);
764                                 (*cb) (req->cba, req->tcall, req->rcall,
765                                        req->err);
766                         }
767                         kfree(req);
768                         wake_up(&m->equeue);
769                         break;
770                 }
771         }
772
773         if (!cb)
774                 spin_unlock(&m->lock);
775
776         if (v9fs_check_idpool(tag, &m->tidpool))
777                 v9fs_put_idpool(tag, &m->tidpool);
778
779         kfree(tc);
780         kfree(rc);
781 }
782
783 static void
784 v9fs_mux_flush_request(struct v9fs_mux_data *m, struct v9fs_req *req)
785 {
786         struct v9fs_fcall *fc;
787
788         dprintk(DEBUG_MUX, "mux %p req %p tag %d\n", m, req, req->tag);
789
790         fc = kmalloc(sizeof(struct v9fs_fcall), GFP_KERNEL);
791         fc->id = TFLUSH;
792         fc->params.tflush.oldtag = req->tag;
793
794         v9fs_send_request(m, fc, v9fs_mux_flush_cb, m);
795 }
796
797 static void
798 v9fs_mux_rpc_cb(void *a, struct v9fs_fcall *tc, struct v9fs_fcall *rc, int err)
799 {
800         struct v9fs_mux_rpc *r;
801
802         if (err == ERREQFLUSH) {
803                 dprintk(DEBUG_MUX, "err req flush\n");
804                 return;
805         }
806
807         r = a;
808         dprintk(DEBUG_MUX, "mux %p req %p tc %p rc %p err %d\n", r->m, r->req,
809                 tc, rc, err);
810         r->rcall = rc;
811         r->err = err;
812         wake_up(&r->wqueue);
813 }
814
815 /**
816  * v9fs_mux_rpc - sends 9P request and waits until a response is available.
817  *      The function can be interrupted.
818  * @m: mux data
819  * @tc: request to be sent
820  * @rc: pointer where a pointer to the response is stored
821  */
822 int
823 v9fs_mux_rpc(struct v9fs_mux_data *m, struct v9fs_fcall *tc,
824              struct v9fs_fcall **rc)
825 {
826         int err;
827         unsigned long flags;
828         struct v9fs_req *req;
829         struct v9fs_mux_rpc r;
830
831         r.err = 0;
832         r.rcall = NULL;
833         r.m = m;
834         init_waitqueue_head(&r.wqueue);
835
836         if (rc)
837                 *rc = NULL;
838
839         req = v9fs_send_request(m, tc, v9fs_mux_rpc_cb, &r);
840         if (IS_ERR(req)) {
841                 err = PTR_ERR(req);
842                 dprintk(DEBUG_MUX, "error %d\n", err);
843                 return PTR_ERR(req);
844         }
845
846         r.req = req;
847         dprintk(DEBUG_MUX, "mux %p tc %p tag %d rpc %p req %p\n", m, tc,
848                 req->tag, &r, req);
849         err = wait_event_interruptible(r.wqueue, r.rcall != NULL || r.err < 0);
850         if (r.err < 0)
851                 err = r.err;
852
853         if (err == -ERESTARTSYS && m->trans->status == Connected && m->err == 0) {
854                 spin_lock(&m->lock);
855                 req->tcall = NULL;
856                 req->err = ERREQFLUSH;
857                 spin_unlock(&m->lock);
858
859                 clear_thread_flag(TIF_SIGPENDING);
860                 v9fs_mux_flush_request(m, req);
861                 spin_lock_irqsave(&current->sighand->siglock, flags);
862                 recalc_sigpending();
863                 spin_unlock_irqrestore(&current->sighand->siglock, flags);
864         }
865
866         if (!err) {
867                 if (r.rcall)
868                         dprintk(DEBUG_MUX, "got response id %d tag %d\n",
869                                 r.rcall->id, r.rcall->tag);
870
871                 if (rc)
872                         *rc = r.rcall;
873                 else
874                         kfree(r.rcall);
875         } else {
876                 kfree(r.rcall);
877                 dprintk(DEBUG_MUX, "got error %d\n", err);
878                 if (err > 0)
879                         err = -EIO;
880         }
881
882         return err;
883 }
884
885 /**
886  * v9fs_mux_rpcnb - sends 9P request without waiting for response.
887  * @m: mux data
888  * @tc: request to be sent
889  * @cb: callback function to be called when response arrives
890  * @cba: value to pass to the callback function
891  */
892 int v9fs_mux_rpcnb(struct v9fs_mux_data *m, struct v9fs_fcall *tc,
893                    v9fs_mux_req_callback cb, void *a)
894 {
895         int err;
896         struct v9fs_req *req;
897
898         req = v9fs_send_request(m, tc, cb, a);
899         if (IS_ERR(req)) {
900                 err = PTR_ERR(req);
901                 dprintk(DEBUG_MUX, "error %d\n", err);
902                 return PTR_ERR(req);
903         }
904
905         dprintk(DEBUG_MUX, "mux %p tc %p tag %d\n", m, tc, req->tag);
906         return 0;
907 }
908
909 /**
910  * v9fs_mux_cancel - cancel all pending requests with error
911  * @m: mux data
912  * @err: error code
913  */
914 void v9fs_mux_cancel(struct v9fs_mux_data *m, int err)
915 {
916         struct v9fs_req *req, *rtmp;
917         LIST_HEAD(cancel_list);
918
919         dprintk(DEBUG_MUX, "mux %p err %d\n", m, err);
920         m->err = err;
921         spin_lock(&m->lock);
922         list_for_each_entry_safe(req, rtmp, &m->req_list, req_list) {
923                 list_move(&req->req_list, &cancel_list);
924         }
925         spin_unlock(&m->lock);
926
927         list_for_each_entry_safe(req, rtmp, &cancel_list, req_list) {
928                 list_del(&req->req_list);
929                 if (!req->err)
930                         req->err = err;
931
932                 if (req->cb)
933                         (*req->cb) (req->cba, req->tcall, req->rcall, req->err);
934                 else
935                         kfree(req->rcall);
936
937                 kfree(req);
938         }
939
940         wake_up(&m->equeue);
941 }