]> bbs.cooldavid.org Git - net-next-2.6.git/blob - net/sunrpc/clnt.c
SUNRPC: Move remaining RPC client related task initialisation into clnt.c
[net-next-2.6.git] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/clnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP connect handling.
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  NB: BSD uses a more intelligent approach to guessing when a request
17  *  or reply has been lost by keeping the RTO estimate for each procedure.
18  *  We currently make do with a constant timeout value.
19  *
20  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
21  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
22  */
23
24 #include <asm/system.h>
25
26 #include <linux/module.h>
27 #include <linux/types.h>
28 #include <linux/kallsyms.h>
29 #include <linux/mm.h>
30 #include <linux/namei.h>
31 #include <linux/mount.h>
32 #include <linux/slab.h>
33 #include <linux/utsname.h>
34 #include <linux/workqueue.h>
35 #include <linux/in6.h>
36
37 #include <linux/sunrpc/clnt.h>
38 #include <linux/sunrpc/rpc_pipe_fs.h>
39 #include <linux/sunrpc/metrics.h>
40 #include <linux/sunrpc/bc_xprt.h>
41
42 #include "sunrpc.h"
43
44 #ifdef RPC_DEBUG
45 # define RPCDBG_FACILITY        RPCDBG_CALL
46 #endif
47
48 #define dprint_status(t)                                        \
49         dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
50                         __func__, t->tk_status)
51
52 /*
53  * All RPC clients are linked into this list
54  */
55 static LIST_HEAD(all_clients);
56 static DEFINE_SPINLOCK(rpc_client_lock);
57
58 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
59
60
61 static void     call_start(struct rpc_task *task);
62 static void     call_reserve(struct rpc_task *task);
63 static void     call_reserveresult(struct rpc_task *task);
64 static void     call_allocate(struct rpc_task *task);
65 static void     call_decode(struct rpc_task *task);
66 static void     call_bind(struct rpc_task *task);
67 static void     call_bind_status(struct rpc_task *task);
68 static void     call_transmit(struct rpc_task *task);
69 #if defined(CONFIG_NFS_V4_1)
70 static void     call_bc_transmit(struct rpc_task *task);
71 #endif /* CONFIG_NFS_V4_1 */
72 static void     call_status(struct rpc_task *task);
73 static void     call_transmit_status(struct rpc_task *task);
74 static void     call_refresh(struct rpc_task *task);
75 static void     call_refreshresult(struct rpc_task *task);
76 static void     call_timeout(struct rpc_task *task);
77 static void     call_connect(struct rpc_task *task);
78 static void     call_connect_status(struct rpc_task *task);
79
80 static __be32   *rpc_encode_header(struct rpc_task *task);
81 static __be32   *rpc_verify_header(struct rpc_task *task);
82 static int      rpc_ping(struct rpc_clnt *clnt);
83
84 static void rpc_register_client(struct rpc_clnt *clnt)
85 {
86         spin_lock(&rpc_client_lock);
87         list_add(&clnt->cl_clients, &all_clients);
88         spin_unlock(&rpc_client_lock);
89 }
90
91 static void rpc_unregister_client(struct rpc_clnt *clnt)
92 {
93         spin_lock(&rpc_client_lock);
94         list_del(&clnt->cl_clients);
95         spin_unlock(&rpc_client_lock);
96 }
97
98 static int
99 rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
100 {
101         static uint32_t clntid;
102         struct nameidata nd;
103         struct path path;
104         char name[15];
105         struct qstr q = {
106                 .name = name,
107         };
108         int error;
109
110         clnt->cl_path.mnt = ERR_PTR(-ENOENT);
111         clnt->cl_path.dentry = ERR_PTR(-ENOENT);
112         if (dir_name == NULL)
113                 return 0;
114
115         path.mnt = rpc_get_mount();
116         if (IS_ERR(path.mnt))
117                 return PTR_ERR(path.mnt);
118         error = vfs_path_lookup(path.mnt->mnt_root, path.mnt, dir_name, 0, &nd);
119         if (error)
120                 goto err;
121
122         for (;;) {
123                 q.len = snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
124                 name[sizeof(name) - 1] = '\0';
125                 q.hash = full_name_hash(q.name, q.len);
126                 path.dentry = rpc_create_client_dir(nd.path.dentry, &q, clnt);
127                 if (!IS_ERR(path.dentry))
128                         break;
129                 error = PTR_ERR(path.dentry);
130                 if (error != -EEXIST) {
131                         printk(KERN_INFO "RPC: Couldn't create pipefs entry"
132                                         " %s/%s, error %d\n",
133                                         dir_name, name, error);
134                         goto err_path_put;
135                 }
136         }
137         path_put(&nd.path);
138         clnt->cl_path = path;
139         return 0;
140 err_path_put:
141         path_put(&nd.path);
142 err:
143         rpc_put_mount();
144         return error;
145 }
146
147 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, struct rpc_xprt *xprt)
148 {
149         struct rpc_program      *program = args->program;
150         struct rpc_version      *version;
151         struct rpc_clnt         *clnt = NULL;
152         struct rpc_auth         *auth;
153         int err;
154         size_t len;
155
156         /* sanity check the name before trying to print it */
157         err = -EINVAL;
158         len = strlen(args->servername);
159         if (len > RPC_MAXNETNAMELEN)
160                 goto out_no_rpciod;
161         len++;
162
163         dprintk("RPC:       creating %s client for %s (xprt %p)\n",
164                         program->name, args->servername, xprt);
165
166         err = rpciod_up();
167         if (err)
168                 goto out_no_rpciod;
169         err = -EINVAL;
170         if (!xprt)
171                 goto out_no_xprt;
172
173         if (args->version >= program->nrvers)
174                 goto out_err;
175         version = program->version[args->version];
176         if (version == NULL)
177                 goto out_err;
178
179         err = -ENOMEM;
180         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
181         if (!clnt)
182                 goto out_err;
183         clnt->cl_parent = clnt;
184
185         clnt->cl_server = clnt->cl_inline_name;
186         if (len > sizeof(clnt->cl_inline_name)) {
187                 char *buf = kmalloc(len, GFP_KERNEL);
188                 if (buf != NULL)
189                         clnt->cl_server = buf;
190                 else
191                         len = sizeof(clnt->cl_inline_name);
192         }
193         strlcpy(clnt->cl_server, args->servername, len);
194
195         clnt->cl_xprt     = xprt;
196         clnt->cl_procinfo = version->procs;
197         clnt->cl_maxproc  = version->nrprocs;
198         clnt->cl_protname = program->name;
199         clnt->cl_prog     = args->prognumber ? : program->number;
200         clnt->cl_vers     = version->number;
201         clnt->cl_stats    = program->stats;
202         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
203         err = -ENOMEM;
204         if (clnt->cl_metrics == NULL)
205                 goto out_no_stats;
206         clnt->cl_program  = program;
207         INIT_LIST_HEAD(&clnt->cl_tasks);
208         spin_lock_init(&clnt->cl_lock);
209
210         if (!xprt_bound(clnt->cl_xprt))
211                 clnt->cl_autobind = 1;
212
213         clnt->cl_timeout = xprt->timeout;
214         if (args->timeout != NULL) {
215                 memcpy(&clnt->cl_timeout_default, args->timeout,
216                                 sizeof(clnt->cl_timeout_default));
217                 clnt->cl_timeout = &clnt->cl_timeout_default;
218         }
219
220         clnt->cl_rtt = &clnt->cl_rtt_default;
221         rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
222         clnt->cl_principal = NULL;
223         if (args->client_name) {
224                 clnt->cl_principal = kstrdup(args->client_name, GFP_KERNEL);
225                 if (!clnt->cl_principal)
226                         goto out_no_principal;
227         }
228
229         kref_init(&clnt->cl_kref);
230
231         err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
232         if (err < 0)
233                 goto out_no_path;
234
235         auth = rpcauth_create(args->authflavor, clnt);
236         if (IS_ERR(auth)) {
237                 printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
238                                 args->authflavor);
239                 err = PTR_ERR(auth);
240                 goto out_no_auth;
241         }
242
243         /* save the nodename */
244         clnt->cl_nodelen = strlen(init_utsname()->nodename);
245         if (clnt->cl_nodelen > UNX_MAXNODENAME)
246                 clnt->cl_nodelen = UNX_MAXNODENAME;
247         memcpy(clnt->cl_nodename, init_utsname()->nodename, clnt->cl_nodelen);
248         rpc_register_client(clnt);
249         return clnt;
250
251 out_no_auth:
252         if (!IS_ERR(clnt->cl_path.dentry)) {
253                 rpc_remove_client_dir(clnt->cl_path.dentry);
254                 rpc_put_mount();
255         }
256 out_no_path:
257         kfree(clnt->cl_principal);
258 out_no_principal:
259         rpc_free_iostats(clnt->cl_metrics);
260 out_no_stats:
261         if (clnt->cl_server != clnt->cl_inline_name)
262                 kfree(clnt->cl_server);
263         kfree(clnt);
264 out_err:
265         xprt_put(xprt);
266 out_no_xprt:
267         rpciod_down();
268 out_no_rpciod:
269         return ERR_PTR(err);
270 }
271
272 /*
273  * rpc_create - create an RPC client and transport with one call
274  * @args: rpc_clnt create argument structure
275  *
276  * Creates and initializes an RPC transport and an RPC client.
277  *
278  * It can ping the server in order to determine if it is up, and to see if
279  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
280  * this behavior so asynchronous tasks can also use rpc_create.
281  */
282 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
283 {
284         struct rpc_xprt *xprt;
285         struct rpc_clnt *clnt;
286         struct xprt_create xprtargs = {
287                 .ident = args->protocol,
288                 .srcaddr = args->saddress,
289                 .dstaddr = args->address,
290                 .addrlen = args->addrsize,
291                 .bc_xprt = args->bc_xprt,
292         };
293         char servername[48];
294
295         /*
296          * If the caller chooses not to specify a hostname, whip
297          * up a string representation of the passed-in address.
298          */
299         if (args->servername == NULL) {
300                 servername[0] = '\0';
301                 switch (args->address->sa_family) {
302                 case AF_INET: {
303                         struct sockaddr_in *sin =
304                                         (struct sockaddr_in *)args->address;
305                         snprintf(servername, sizeof(servername), "%pI4",
306                                  &sin->sin_addr.s_addr);
307                         break;
308                 }
309                 case AF_INET6: {
310                         struct sockaddr_in6 *sin =
311                                         (struct sockaddr_in6 *)args->address;
312                         snprintf(servername, sizeof(servername), "%pI6",
313                                  &sin->sin6_addr);
314                         break;
315                 }
316                 default:
317                         /* caller wants default server name, but
318                          * address family isn't recognized. */
319                         return ERR_PTR(-EINVAL);
320                 }
321                 args->servername = servername;
322         }
323
324         xprt = xprt_create_transport(&xprtargs);
325         if (IS_ERR(xprt))
326                 return (struct rpc_clnt *)xprt;
327
328         /*
329          * By default, kernel RPC client connects from a reserved port.
330          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
331          * but it is always enabled for rpciod, which handles the connect
332          * operation.
333          */
334         xprt->resvport = 1;
335         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
336                 xprt->resvport = 0;
337
338         clnt = rpc_new_client(args, xprt);
339         if (IS_ERR(clnt))
340                 return clnt;
341
342         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
343                 int err = rpc_ping(clnt);
344                 if (err != 0) {
345                         rpc_shutdown_client(clnt);
346                         return ERR_PTR(err);
347                 }
348         }
349
350         clnt->cl_softrtry = 1;
351         if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
352                 clnt->cl_softrtry = 0;
353
354         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
355                 clnt->cl_autobind = 1;
356         if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
357                 clnt->cl_discrtry = 1;
358         if (!(args->flags & RPC_CLNT_CREATE_QUIET))
359                 clnt->cl_chatty = 1;
360
361         return clnt;
362 }
363 EXPORT_SYMBOL_GPL(rpc_create);
364
365 /*
366  * This function clones the RPC client structure. It allows us to share the
367  * same transport while varying parameters such as the authentication
368  * flavour.
369  */
370 struct rpc_clnt *
371 rpc_clone_client(struct rpc_clnt *clnt)
372 {
373         struct rpc_clnt *new;
374         int err = -ENOMEM;
375
376         new = kmemdup(clnt, sizeof(*new), GFP_KERNEL);
377         if (!new)
378                 goto out_no_clnt;
379         new->cl_parent = clnt;
380         /* Turn off autobind on clones */
381         new->cl_autobind = 0;
382         INIT_LIST_HEAD(&new->cl_tasks);
383         spin_lock_init(&new->cl_lock);
384         rpc_init_rtt(&new->cl_rtt_default, clnt->cl_timeout->to_initval);
385         new->cl_metrics = rpc_alloc_iostats(clnt);
386         if (new->cl_metrics == NULL)
387                 goto out_no_stats;
388         if (clnt->cl_principal) {
389                 new->cl_principal = kstrdup(clnt->cl_principal, GFP_KERNEL);
390                 if (new->cl_principal == NULL)
391                         goto out_no_principal;
392         }
393         kref_init(&new->cl_kref);
394         err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name);
395         if (err != 0)
396                 goto out_no_path;
397         if (new->cl_auth)
398                 atomic_inc(&new->cl_auth->au_count);
399         xprt_get(clnt->cl_xprt);
400         kref_get(&clnt->cl_kref);
401         rpc_register_client(new);
402         rpciod_up();
403         return new;
404 out_no_path:
405         kfree(new->cl_principal);
406 out_no_principal:
407         rpc_free_iostats(new->cl_metrics);
408 out_no_stats:
409         kfree(new);
410 out_no_clnt:
411         dprintk("RPC:       %s: returned error %d\n", __func__, err);
412         return ERR_PTR(err);
413 }
414 EXPORT_SYMBOL_GPL(rpc_clone_client);
415
416 /*
417  * Kill all tasks for the given client.
418  * XXX: kill their descendants as well?
419  */
420 void rpc_killall_tasks(struct rpc_clnt *clnt)
421 {
422         struct rpc_task *rovr;
423
424
425         if (list_empty(&clnt->cl_tasks))
426                 return;
427         dprintk("RPC:       killing all tasks for client %p\n", clnt);
428         /*
429          * Spin lock all_tasks to prevent changes...
430          */
431         spin_lock(&clnt->cl_lock);
432         list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
433                 if (!RPC_IS_ACTIVATED(rovr))
434                         continue;
435                 if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
436                         rovr->tk_flags |= RPC_TASK_KILLED;
437                         rpc_exit(rovr, -EIO);
438                         rpc_wake_up_queued_task(rovr->tk_waitqueue, rovr);
439                 }
440         }
441         spin_unlock(&clnt->cl_lock);
442 }
443 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
444
445 /*
446  * Properly shut down an RPC client, terminating all outstanding
447  * requests.
448  */
449 void rpc_shutdown_client(struct rpc_clnt *clnt)
450 {
451         dprintk("RPC:       shutting down %s client for %s\n",
452                         clnt->cl_protname, clnt->cl_server);
453
454         while (!list_empty(&clnt->cl_tasks)) {
455                 rpc_killall_tasks(clnt);
456                 wait_event_timeout(destroy_wait,
457                         list_empty(&clnt->cl_tasks), 1*HZ);
458         }
459
460         rpc_release_client(clnt);
461 }
462 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
463
464 /*
465  * Free an RPC client
466  */
467 static void
468 rpc_free_client(struct kref *kref)
469 {
470         struct rpc_clnt *clnt = container_of(kref, struct rpc_clnt, cl_kref);
471
472         dprintk("RPC:       destroying %s client for %s\n",
473                         clnt->cl_protname, clnt->cl_server);
474         if (!IS_ERR(clnt->cl_path.dentry)) {
475                 rpc_remove_client_dir(clnt->cl_path.dentry);
476                 rpc_put_mount();
477         }
478         if (clnt->cl_parent != clnt) {
479                 rpc_release_client(clnt->cl_parent);
480                 goto out_free;
481         }
482         if (clnt->cl_server != clnt->cl_inline_name)
483                 kfree(clnt->cl_server);
484 out_free:
485         rpc_unregister_client(clnt);
486         rpc_free_iostats(clnt->cl_metrics);
487         kfree(clnt->cl_principal);
488         clnt->cl_metrics = NULL;
489         xprt_put(clnt->cl_xprt);
490         rpciod_down();
491         kfree(clnt);
492 }
493
494 /*
495  * Free an RPC client
496  */
497 static void
498 rpc_free_auth(struct kref *kref)
499 {
500         struct rpc_clnt *clnt = container_of(kref, struct rpc_clnt, cl_kref);
501
502         if (clnt->cl_auth == NULL) {
503                 rpc_free_client(kref);
504                 return;
505         }
506
507         /*
508          * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
509          *       release remaining GSS contexts. This mechanism ensures
510          *       that it can do so safely.
511          */
512         kref_init(kref);
513         rpcauth_release(clnt->cl_auth);
514         clnt->cl_auth = NULL;
515         kref_put(kref, rpc_free_client);
516 }
517
518 /*
519  * Release reference to the RPC client
520  */
521 void
522 rpc_release_client(struct rpc_clnt *clnt)
523 {
524         dprintk("RPC:       rpc_release_client(%p)\n", clnt);
525
526         if (list_empty(&clnt->cl_tasks))
527                 wake_up(&destroy_wait);
528         kref_put(&clnt->cl_kref, rpc_free_auth);
529 }
530
531 /**
532  * rpc_bind_new_program - bind a new RPC program to an existing client
533  * @old: old rpc_client
534  * @program: rpc program to set
535  * @vers: rpc program version
536  *
537  * Clones the rpc client and sets up a new RPC program. This is mainly
538  * of use for enabling different RPC programs to share the same transport.
539  * The Sun NFSv2/v3 ACL protocol can do this.
540  */
541 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
542                                       struct rpc_program *program,
543                                       u32 vers)
544 {
545         struct rpc_clnt *clnt;
546         struct rpc_version *version;
547         int err;
548
549         BUG_ON(vers >= program->nrvers || !program->version[vers]);
550         version = program->version[vers];
551         clnt = rpc_clone_client(old);
552         if (IS_ERR(clnt))
553                 goto out;
554         clnt->cl_procinfo = version->procs;
555         clnt->cl_maxproc  = version->nrprocs;
556         clnt->cl_protname = program->name;
557         clnt->cl_prog     = program->number;
558         clnt->cl_vers     = version->number;
559         clnt->cl_stats    = program->stats;
560         err = rpc_ping(clnt);
561         if (err != 0) {
562                 rpc_shutdown_client(clnt);
563                 clnt = ERR_PTR(err);
564         }
565 out:
566         return clnt;
567 }
568 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
569
570 void rpc_task_release_client(struct rpc_task *task)
571 {
572         struct rpc_clnt *clnt = task->tk_client;
573
574         if (clnt != NULL) {
575                 /* Remove from client task list */
576                 spin_lock(&clnt->cl_lock);
577                 list_del(&task->tk_task);
578                 spin_unlock(&clnt->cl_lock);
579                 task->tk_client = NULL;
580
581                 rpc_release_client(clnt);
582         }
583 }
584
585 static
586 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
587 {
588         if (clnt != NULL) {
589                 rpc_task_release_client(task);
590                 task->tk_client = clnt;
591                 kref_get(&clnt->cl_kref);
592                 if (clnt->cl_softrtry)
593                         task->tk_flags |= RPC_TASK_SOFT;
594                 /* Add to the client's list of all tasks */
595                 spin_lock(&clnt->cl_lock);
596                 list_add_tail(&task->tk_task, &clnt->cl_tasks);
597                 spin_unlock(&clnt->cl_lock);
598         }
599 }
600
601 static void
602 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
603 {
604         if (msg != NULL) {
605                 task->tk_msg.rpc_proc = msg->rpc_proc;
606                 task->tk_msg.rpc_argp = msg->rpc_argp;
607                 task->tk_msg.rpc_resp = msg->rpc_resp;
608                 /* Bind the user cred */
609                 rpcauth_bindcred(task, msg->rpc_cred, task->tk_flags);
610         }
611 }
612
613 /*
614  * Default callback for async RPC calls
615  */
616 static void
617 rpc_default_callback(struct rpc_task *task, void *data)
618 {
619 }
620
621 static const struct rpc_call_ops rpc_default_ops = {
622         .rpc_call_done = rpc_default_callback,
623 };
624
625 /**
626  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
627  * @task_setup_data: pointer to task initialisation data
628  */
629 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
630 {
631         struct rpc_task *task;
632
633         task = rpc_new_task(task_setup_data);
634         if (IS_ERR(task))
635                 goto out;
636
637         rpc_task_set_client(task, task_setup_data->rpc_client);
638         rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
639
640         if (task->tk_status != 0) {
641                 int ret = task->tk_status;
642                 rpc_put_task(task);
643                 return ERR_PTR(ret);
644         }
645
646         if (task->tk_action == NULL)
647                 rpc_call_start(task);
648
649         atomic_inc(&task->tk_count);
650         rpc_execute(task);
651 out:
652         return task;
653 }
654 EXPORT_SYMBOL_GPL(rpc_run_task);
655
656 /**
657  * rpc_call_sync - Perform a synchronous RPC call
658  * @clnt: pointer to RPC client
659  * @msg: RPC call parameters
660  * @flags: RPC call flags
661  */
662 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
663 {
664         struct rpc_task *task;
665         struct rpc_task_setup task_setup_data = {
666                 .rpc_client = clnt,
667                 .rpc_message = msg,
668                 .callback_ops = &rpc_default_ops,
669                 .flags = flags,
670         };
671         int status;
672
673         BUG_ON(flags & RPC_TASK_ASYNC);
674
675         task = rpc_run_task(&task_setup_data);
676         if (IS_ERR(task))
677                 return PTR_ERR(task);
678         status = task->tk_status;
679         rpc_put_task(task);
680         return status;
681 }
682 EXPORT_SYMBOL_GPL(rpc_call_sync);
683
684 /**
685  * rpc_call_async - Perform an asynchronous RPC call
686  * @clnt: pointer to RPC client
687  * @msg: RPC call parameters
688  * @flags: RPC call flags
689  * @tk_ops: RPC call ops
690  * @data: user call data
691  */
692 int
693 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
694                const struct rpc_call_ops *tk_ops, void *data)
695 {
696         struct rpc_task *task;
697         struct rpc_task_setup task_setup_data = {
698                 .rpc_client = clnt,
699                 .rpc_message = msg,
700                 .callback_ops = tk_ops,
701                 .callback_data = data,
702                 .flags = flags|RPC_TASK_ASYNC,
703         };
704
705         task = rpc_run_task(&task_setup_data);
706         if (IS_ERR(task))
707                 return PTR_ERR(task);
708         rpc_put_task(task);
709         return 0;
710 }
711 EXPORT_SYMBOL_GPL(rpc_call_async);
712
713 #if defined(CONFIG_NFS_V4_1)
714 /**
715  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
716  * rpc_execute against it
717  * @req: RPC request
718  * @tk_ops: RPC call ops
719  */
720 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
721                                 const struct rpc_call_ops *tk_ops)
722 {
723         struct rpc_task *task;
724         struct xdr_buf *xbufp = &req->rq_snd_buf;
725         struct rpc_task_setup task_setup_data = {
726                 .callback_ops = tk_ops,
727         };
728
729         dprintk("RPC: rpc_run_bc_task req= %p\n", req);
730         /*
731          * Create an rpc_task to send the data
732          */
733         task = rpc_new_task(&task_setup_data);
734         if (IS_ERR(task)) {
735                 xprt_free_bc_request(req);
736                 goto out;
737         }
738         task->tk_rqstp = req;
739
740         /*
741          * Set up the xdr_buf length.
742          * This also indicates that the buffer is XDR encoded already.
743          */
744         xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
745                         xbufp->tail[0].iov_len;
746
747         task->tk_action = call_bc_transmit;
748         atomic_inc(&task->tk_count);
749         BUG_ON(atomic_read(&task->tk_count) != 2);
750         rpc_execute(task);
751
752 out:
753         dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
754         return task;
755 }
756 #endif /* CONFIG_NFS_V4_1 */
757
758 void
759 rpc_call_start(struct rpc_task *task)
760 {
761         task->tk_action = call_start;
762 }
763 EXPORT_SYMBOL_GPL(rpc_call_start);
764
765 /**
766  * rpc_peeraddr - extract remote peer address from clnt's xprt
767  * @clnt: RPC client structure
768  * @buf: target buffer
769  * @bufsize: length of target buffer
770  *
771  * Returns the number of bytes that are actually in the stored address.
772  */
773 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
774 {
775         size_t bytes;
776         struct rpc_xprt *xprt = clnt->cl_xprt;
777
778         bytes = sizeof(xprt->addr);
779         if (bytes > bufsize)
780                 bytes = bufsize;
781         memcpy(buf, &clnt->cl_xprt->addr, bytes);
782         return xprt->addrlen;
783 }
784 EXPORT_SYMBOL_GPL(rpc_peeraddr);
785
786 /**
787  * rpc_peeraddr2str - return remote peer address in printable format
788  * @clnt: RPC client structure
789  * @format: address format
790  *
791  */
792 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
793                              enum rpc_display_format_t format)
794 {
795         struct rpc_xprt *xprt = clnt->cl_xprt;
796
797         if (xprt->address_strings[format] != NULL)
798                 return xprt->address_strings[format];
799         else
800                 return "unprintable";
801 }
802 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
803
804 void
805 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
806 {
807         struct rpc_xprt *xprt = clnt->cl_xprt;
808         if (xprt->ops->set_buffer_size)
809                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
810 }
811 EXPORT_SYMBOL_GPL(rpc_setbufsize);
812
813 /*
814  * Return size of largest payload RPC client can support, in bytes
815  *
816  * For stream transports, this is one RPC record fragment (see RFC
817  * 1831), as we don't support multi-record requests yet.  For datagram
818  * transports, this is the size of an IP packet minus the IP, UDP, and
819  * RPC header sizes.
820  */
821 size_t rpc_max_payload(struct rpc_clnt *clnt)
822 {
823         return clnt->cl_xprt->max_payload;
824 }
825 EXPORT_SYMBOL_GPL(rpc_max_payload);
826
827 /**
828  * rpc_force_rebind - force transport to check that remote port is unchanged
829  * @clnt: client to rebind
830  *
831  */
832 void rpc_force_rebind(struct rpc_clnt *clnt)
833 {
834         if (clnt->cl_autobind)
835                 xprt_clear_bound(clnt->cl_xprt);
836 }
837 EXPORT_SYMBOL_GPL(rpc_force_rebind);
838
839 /*
840  * Restart an (async) RPC call from the call_prepare state.
841  * Usually called from within the exit handler.
842  */
843 int
844 rpc_restart_call_prepare(struct rpc_task *task)
845 {
846         if (RPC_ASSASSINATED(task))
847                 return 0;
848         task->tk_action = rpc_prepare_task;
849         return 1;
850 }
851 EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
852
853 /*
854  * Restart an (async) RPC call. Usually called from within the
855  * exit handler.
856  */
857 int
858 rpc_restart_call(struct rpc_task *task)
859 {
860         if (RPC_ASSASSINATED(task))
861                 return 0;
862         task->tk_action = call_start;
863         return 1;
864 }
865 EXPORT_SYMBOL_GPL(rpc_restart_call);
866
867 #ifdef RPC_DEBUG
868 static const char *rpc_proc_name(const struct rpc_task *task)
869 {
870         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
871
872         if (proc) {
873                 if (proc->p_name)
874                         return proc->p_name;
875                 else
876                         return "NULL";
877         } else
878                 return "no proc";
879 }
880 #endif
881
882 /*
883  * 0.  Initial state
884  *
885  *     Other FSM states can be visited zero or more times, but
886  *     this state is visited exactly once for each RPC.
887  */
888 static void
889 call_start(struct rpc_task *task)
890 {
891         struct rpc_clnt *clnt = task->tk_client;
892
893         dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
894                         clnt->cl_protname, clnt->cl_vers,
895                         rpc_proc_name(task),
896                         (RPC_IS_ASYNC(task) ? "async" : "sync"));
897
898         /* Increment call count */
899         task->tk_msg.rpc_proc->p_count++;
900         clnt->cl_stats->rpccnt++;
901         task->tk_action = call_reserve;
902 }
903
904 /*
905  * 1.   Reserve an RPC call slot
906  */
907 static void
908 call_reserve(struct rpc_task *task)
909 {
910         dprint_status(task);
911
912         if (!rpcauth_uptodatecred(task)) {
913                 task->tk_action = call_refresh;
914                 return;
915         }
916
917         task->tk_status  = 0;
918         task->tk_action  = call_reserveresult;
919         xprt_reserve(task);
920 }
921
922 /*
923  * 1b.  Grok the result of xprt_reserve()
924  */
925 static void
926 call_reserveresult(struct rpc_task *task)
927 {
928         int status = task->tk_status;
929
930         dprint_status(task);
931
932         /*
933          * After a call to xprt_reserve(), we must have either
934          * a request slot or else an error status.
935          */
936         task->tk_status = 0;
937         if (status >= 0) {
938                 if (task->tk_rqstp) {
939                         task->tk_action = call_allocate;
940                         return;
941                 }
942
943                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
944                                 __func__, status);
945                 rpc_exit(task, -EIO);
946                 return;
947         }
948
949         /*
950          * Even though there was an error, we may have acquired
951          * a request slot somehow.  Make sure not to leak it.
952          */
953         if (task->tk_rqstp) {
954                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
955                                 __func__, status);
956                 xprt_release(task);
957         }
958
959         switch (status) {
960         case -EAGAIN:   /* woken up; retry */
961                 task->tk_action = call_reserve;
962                 return;
963         case -EIO:      /* probably a shutdown */
964                 break;
965         default:
966                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
967                                 __func__, status);
968                 break;
969         }
970         rpc_exit(task, status);
971 }
972
973 /*
974  * 2.   Allocate the buffer. For details, see sched.c:rpc_malloc.
975  *      (Note: buffer memory is freed in xprt_release).
976  */
977 static void
978 call_allocate(struct rpc_task *task)
979 {
980         unsigned int slack = task->tk_msg.rpc_cred->cr_auth->au_cslack;
981         struct rpc_rqst *req = task->tk_rqstp;
982         struct rpc_xprt *xprt = task->tk_xprt;
983         struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
984
985         dprint_status(task);
986
987         task->tk_status = 0;
988         task->tk_action = call_bind;
989
990         if (req->rq_buffer)
991                 return;
992
993         if (proc->p_proc != 0) {
994                 BUG_ON(proc->p_arglen == 0);
995                 if (proc->p_decode != NULL)
996                         BUG_ON(proc->p_replen == 0);
997         }
998
999         /*
1000          * Calculate the size (in quads) of the RPC call
1001          * and reply headers, and convert both values
1002          * to byte sizes.
1003          */
1004         req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
1005         req->rq_callsize <<= 2;
1006         req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
1007         req->rq_rcvsize <<= 2;
1008
1009         req->rq_buffer = xprt->ops->buf_alloc(task,
1010                                         req->rq_callsize + req->rq_rcvsize);
1011         if (req->rq_buffer != NULL)
1012                 return;
1013
1014         dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
1015
1016         if (RPC_IS_ASYNC(task) || !signalled()) {
1017                 task->tk_action = call_allocate;
1018                 rpc_delay(task, HZ>>4);
1019                 return;
1020         }
1021
1022         rpc_exit(task, -ERESTARTSYS);
1023 }
1024
1025 static inline int
1026 rpc_task_need_encode(struct rpc_task *task)
1027 {
1028         return task->tk_rqstp->rq_snd_buf.len == 0;
1029 }
1030
1031 static inline void
1032 rpc_task_force_reencode(struct rpc_task *task)
1033 {
1034         task->tk_rqstp->rq_snd_buf.len = 0;
1035         task->tk_rqstp->rq_bytes_sent = 0;
1036 }
1037
1038 static inline void
1039 rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
1040 {
1041         buf->head[0].iov_base = start;
1042         buf->head[0].iov_len = len;
1043         buf->tail[0].iov_len = 0;
1044         buf->page_len = 0;
1045         buf->flags = 0;
1046         buf->len = 0;
1047         buf->buflen = len;
1048 }
1049
1050 /*
1051  * 3.   Encode arguments of an RPC call
1052  */
1053 static void
1054 rpc_xdr_encode(struct rpc_task *task)
1055 {
1056         struct rpc_rqst *req = task->tk_rqstp;
1057         kxdrproc_t      encode;
1058         __be32          *p;
1059
1060         dprint_status(task);
1061
1062         rpc_xdr_buf_init(&req->rq_snd_buf,
1063                          req->rq_buffer,
1064                          req->rq_callsize);
1065         rpc_xdr_buf_init(&req->rq_rcv_buf,
1066                          (char *)req->rq_buffer + req->rq_callsize,
1067                          req->rq_rcvsize);
1068
1069         p = rpc_encode_header(task);
1070         if (p == NULL) {
1071                 printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
1072                 rpc_exit(task, -EIO);
1073                 return;
1074         }
1075
1076         encode = task->tk_msg.rpc_proc->p_encode;
1077         if (encode == NULL)
1078                 return;
1079
1080         task->tk_status = rpcauth_wrap_req(task, encode, req, p,
1081                         task->tk_msg.rpc_argp);
1082 }
1083
1084 /*
1085  * 4.   Get the server port number if not yet set
1086  */
1087 static void
1088 call_bind(struct rpc_task *task)
1089 {
1090         struct rpc_xprt *xprt = task->tk_xprt;
1091
1092         dprint_status(task);
1093
1094         task->tk_action = call_connect;
1095         if (!xprt_bound(xprt)) {
1096                 task->tk_action = call_bind_status;
1097                 task->tk_timeout = xprt->bind_timeout;
1098                 xprt->ops->rpcbind(task);
1099         }
1100 }
1101
1102 /*
1103  * 4a.  Sort out bind result
1104  */
1105 static void
1106 call_bind_status(struct rpc_task *task)
1107 {
1108         int status = -EIO;
1109
1110         if (task->tk_status >= 0) {
1111                 dprint_status(task);
1112                 task->tk_status = 0;
1113                 task->tk_action = call_connect;
1114                 return;
1115         }
1116
1117         switch (task->tk_status) {
1118         case -ENOMEM:
1119                 dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
1120                 rpc_delay(task, HZ >> 2);
1121                 goto retry_timeout;
1122         case -EACCES:
1123                 dprintk("RPC: %5u remote rpcbind: RPC program/version "
1124                                 "unavailable\n", task->tk_pid);
1125                 /* fail immediately if this is an RPC ping */
1126                 if (task->tk_msg.rpc_proc->p_proc == 0) {
1127                         status = -EOPNOTSUPP;
1128                         break;
1129                 }
1130                 rpc_delay(task, 3*HZ);
1131                 goto retry_timeout;
1132         case -ETIMEDOUT:
1133                 dprintk("RPC: %5u rpcbind request timed out\n",
1134                                 task->tk_pid);
1135                 goto retry_timeout;
1136         case -EPFNOSUPPORT:
1137                 /* server doesn't support any rpcbind version we know of */
1138                 dprintk("RPC: %5u unrecognized remote rpcbind service\n",
1139                                 task->tk_pid);
1140                 break;
1141         case -EPROTONOSUPPORT:
1142                 dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1143                                 task->tk_pid);
1144                 task->tk_status = 0;
1145                 task->tk_action = call_bind;
1146                 return;
1147         case -ECONNREFUSED:             /* connection problems */
1148         case -ECONNRESET:
1149         case -ENOTCONN:
1150         case -EHOSTDOWN:
1151         case -EHOSTUNREACH:
1152         case -ENETUNREACH:
1153         case -EPIPE:
1154                 dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
1155                                 task->tk_pid, task->tk_status);
1156                 if (!RPC_IS_SOFTCONN(task)) {
1157                         rpc_delay(task, 5*HZ);
1158                         goto retry_timeout;
1159                 }
1160                 status = task->tk_status;
1161                 break;
1162         default:
1163                 dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1164                                 task->tk_pid, -task->tk_status);
1165         }
1166
1167         rpc_exit(task, status);
1168         return;
1169
1170 retry_timeout:
1171         task->tk_action = call_timeout;
1172 }
1173
1174 /*
1175  * 4b.  Connect to the RPC server
1176  */
1177 static void
1178 call_connect(struct rpc_task *task)
1179 {
1180         struct rpc_xprt *xprt = task->tk_xprt;
1181
1182         dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1183                         task->tk_pid, xprt,
1184                         (xprt_connected(xprt) ? "is" : "is not"));
1185
1186         task->tk_action = call_transmit;
1187         if (!xprt_connected(xprt)) {
1188                 task->tk_action = call_connect_status;
1189                 if (task->tk_status < 0)
1190                         return;
1191                 xprt_connect(task);
1192         }
1193 }
1194
1195 /*
1196  * 4c.  Sort out connect result
1197  */
1198 static void
1199 call_connect_status(struct rpc_task *task)
1200 {
1201         struct rpc_clnt *clnt = task->tk_client;
1202         int status = task->tk_status;
1203
1204         dprint_status(task);
1205
1206         task->tk_status = 0;
1207         if (status >= 0 || status == -EAGAIN) {
1208                 clnt->cl_stats->netreconn++;
1209                 task->tk_action = call_transmit;
1210                 return;
1211         }
1212
1213         switch (status) {
1214                 /* if soft mounted, test if we've timed out */
1215         case -ETIMEDOUT:
1216                 task->tk_action = call_timeout;
1217                 break;
1218         default:
1219                 rpc_exit(task, -EIO);
1220         }
1221 }
1222
1223 /*
1224  * 5.   Transmit the RPC request, and wait for reply
1225  */
1226 static void
1227 call_transmit(struct rpc_task *task)
1228 {
1229         dprint_status(task);
1230
1231         task->tk_action = call_status;
1232         if (task->tk_status < 0)
1233                 return;
1234         task->tk_status = xprt_prepare_transmit(task);
1235         if (task->tk_status != 0)
1236                 return;
1237         task->tk_action = call_transmit_status;
1238         /* Encode here so that rpcsec_gss can use correct sequence number. */
1239         if (rpc_task_need_encode(task)) {
1240                 BUG_ON(task->tk_rqstp->rq_bytes_sent != 0);
1241                 rpc_xdr_encode(task);
1242                 /* Did the encode result in an error condition? */
1243                 if (task->tk_status != 0) {
1244                         /* Was the error nonfatal? */
1245                         if (task->tk_status == -EAGAIN)
1246                                 rpc_delay(task, HZ >> 4);
1247                         else
1248                                 rpc_exit(task, task->tk_status);
1249                         return;
1250                 }
1251         }
1252         xprt_transmit(task);
1253         if (task->tk_status < 0)
1254                 return;
1255         /*
1256          * On success, ensure that we call xprt_end_transmit() before sleeping
1257          * in order to allow access to the socket to other RPC requests.
1258          */
1259         call_transmit_status(task);
1260         if (rpc_reply_expected(task))
1261                 return;
1262         task->tk_action = rpc_exit_task;
1263         rpc_wake_up_queued_task(&task->tk_xprt->pending, task);
1264 }
1265
1266 /*
1267  * 5a.  Handle cleanup after a transmission
1268  */
1269 static void
1270 call_transmit_status(struct rpc_task *task)
1271 {
1272         task->tk_action = call_status;
1273
1274         /*
1275          * Common case: success.  Force the compiler to put this
1276          * test first.
1277          */
1278         if (task->tk_status == 0) {
1279                 xprt_end_transmit(task);
1280                 rpc_task_force_reencode(task);
1281                 return;
1282         }
1283
1284         switch (task->tk_status) {
1285         case -EAGAIN:
1286                 break;
1287         default:
1288                 dprint_status(task);
1289                 xprt_end_transmit(task);
1290                 rpc_task_force_reencode(task);
1291                 break;
1292                 /*
1293                  * Special cases: if we've been waiting on the
1294                  * socket's write_space() callback, or if the
1295                  * socket just returned a connection error,
1296                  * then hold onto the transport lock.
1297                  */
1298         case -ECONNREFUSED:
1299         case -EHOSTDOWN:
1300         case -EHOSTUNREACH:
1301         case -ENETUNREACH:
1302                 if (RPC_IS_SOFTCONN(task)) {
1303                         xprt_end_transmit(task);
1304                         rpc_exit(task, task->tk_status);
1305                         break;
1306                 }
1307         case -ECONNRESET:
1308         case -ENOTCONN:
1309         case -EPIPE:
1310                 rpc_task_force_reencode(task);
1311         }
1312 }
1313
1314 #if defined(CONFIG_NFS_V4_1)
1315 /*
1316  * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
1317  * addition, disconnect on connectivity errors.
1318  */
1319 static void
1320 call_bc_transmit(struct rpc_task *task)
1321 {
1322         struct rpc_rqst *req = task->tk_rqstp;
1323
1324         BUG_ON(task->tk_status != 0);
1325         task->tk_status = xprt_prepare_transmit(task);
1326         if (task->tk_status == -EAGAIN) {
1327                 /*
1328                  * Could not reserve the transport. Try again after the
1329                  * transport is released.
1330                  */
1331                 task->tk_status = 0;
1332                 task->tk_action = call_bc_transmit;
1333                 return;
1334         }
1335
1336         task->tk_action = rpc_exit_task;
1337         if (task->tk_status < 0) {
1338                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1339                         "error: %d\n", task->tk_status);
1340                 return;
1341         }
1342
1343         xprt_transmit(task);
1344         xprt_end_transmit(task);
1345         dprint_status(task);
1346         switch (task->tk_status) {
1347         case 0:
1348                 /* Success */
1349                 break;
1350         case -EHOSTDOWN:
1351         case -EHOSTUNREACH:
1352         case -ENETUNREACH:
1353         case -ETIMEDOUT:
1354                 /*
1355                  * Problem reaching the server.  Disconnect and let the
1356                  * forechannel reestablish the connection.  The server will
1357                  * have to retransmit the backchannel request and we'll
1358                  * reprocess it.  Since these ops are idempotent, there's no
1359                  * need to cache our reply at this time.
1360                  */
1361                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1362                         "error: %d\n", task->tk_status);
1363                 xprt_conditional_disconnect(task->tk_xprt,
1364                         req->rq_connect_cookie);
1365                 break;
1366         default:
1367                 /*
1368                  * We were unable to reply and will have to drop the
1369                  * request.  The server should reconnect and retransmit.
1370                  */
1371                 BUG_ON(task->tk_status == -EAGAIN);
1372                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1373                         "error: %d\n", task->tk_status);
1374                 break;
1375         }
1376         rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
1377 }
1378 #endif /* CONFIG_NFS_V4_1 */
1379
1380 /*
1381  * 6.   Sort out the RPC call status
1382  */
1383 static void
1384 call_status(struct rpc_task *task)
1385 {
1386         struct rpc_clnt *clnt = task->tk_client;
1387         struct rpc_rqst *req = task->tk_rqstp;
1388         int             status;
1389
1390         if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
1391                 task->tk_status = req->rq_reply_bytes_recvd;
1392
1393         dprint_status(task);
1394
1395         status = task->tk_status;
1396         if (status >= 0) {
1397                 task->tk_action = call_decode;
1398                 return;
1399         }
1400
1401         task->tk_status = 0;
1402         switch(status) {
1403         case -EHOSTDOWN:
1404         case -EHOSTUNREACH:
1405         case -ENETUNREACH:
1406                 /*
1407                  * Delay any retries for 3 seconds, then handle as if it
1408                  * were a timeout.
1409                  */
1410                 rpc_delay(task, 3*HZ);
1411         case -ETIMEDOUT:
1412                 task->tk_action = call_timeout;
1413                 if (task->tk_client->cl_discrtry)
1414                         xprt_conditional_disconnect(task->tk_xprt,
1415                                         req->rq_connect_cookie);
1416                 break;
1417         case -ECONNRESET:
1418         case -ECONNREFUSED:
1419                 rpc_force_rebind(clnt);
1420                 rpc_delay(task, 3*HZ);
1421         case -EPIPE:
1422         case -ENOTCONN:
1423                 task->tk_action = call_bind;
1424                 break;
1425         case -EAGAIN:
1426                 task->tk_action = call_transmit;
1427                 break;
1428         case -EIO:
1429                 /* shutdown or soft timeout */
1430                 rpc_exit(task, status);
1431                 break;
1432         default:
1433                 if (clnt->cl_chatty)
1434                         printk("%s: RPC call returned error %d\n",
1435                                clnt->cl_protname, -status);
1436                 rpc_exit(task, status);
1437         }
1438 }
1439
1440 /*
1441  * 6a.  Handle RPC timeout
1442  *      We do not release the request slot, so we keep using the
1443  *      same XID for all retransmits.
1444  */
1445 static void
1446 call_timeout(struct rpc_task *task)
1447 {
1448         struct rpc_clnt *clnt = task->tk_client;
1449
1450         if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1451                 dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
1452                 goto retry;
1453         }
1454
1455         dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
1456         task->tk_timeouts++;
1457
1458         if (RPC_IS_SOFTCONN(task)) {
1459                 rpc_exit(task, -ETIMEDOUT);
1460                 return;
1461         }
1462         if (RPC_IS_SOFT(task)) {
1463                 if (clnt->cl_chatty)
1464                         printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1465                                 clnt->cl_protname, clnt->cl_server);
1466                 rpc_exit(task, -EIO);
1467                 return;
1468         }
1469
1470         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1471                 task->tk_flags |= RPC_CALL_MAJORSEEN;
1472                 if (clnt->cl_chatty)
1473                         printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1474                         clnt->cl_protname, clnt->cl_server);
1475         }
1476         rpc_force_rebind(clnt);
1477         /*
1478          * Did our request time out due to an RPCSEC_GSS out-of-sequence
1479          * event? RFC2203 requires the server to drop all such requests.
1480          */
1481         rpcauth_invalcred(task);
1482
1483 retry:
1484         clnt->cl_stats->rpcretrans++;
1485         task->tk_action = call_bind;
1486         task->tk_status = 0;
1487 }
1488
1489 /*
1490  * 7.   Decode the RPC reply
1491  */
1492 static void
1493 call_decode(struct rpc_task *task)
1494 {
1495         struct rpc_clnt *clnt = task->tk_client;
1496         struct rpc_rqst *req = task->tk_rqstp;
1497         kxdrproc_t      decode = task->tk_msg.rpc_proc->p_decode;
1498         __be32          *p;
1499
1500         dprintk("RPC: %5u call_decode (status %d)\n",
1501                         task->tk_pid, task->tk_status);
1502
1503         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1504                 if (clnt->cl_chatty)
1505                         printk(KERN_NOTICE "%s: server %s OK\n",
1506                                 clnt->cl_protname, clnt->cl_server);
1507                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
1508         }
1509
1510         /*
1511          * Ensure that we see all writes made by xprt_complete_rqst()
1512          * before it changed req->rq_reply_bytes_recvd.
1513          */
1514         smp_rmb();
1515         req->rq_rcv_buf.len = req->rq_private_buf.len;
1516
1517         /* Check that the softirq receive buffer is valid */
1518         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
1519                                 sizeof(req->rq_rcv_buf)) != 0);
1520
1521         if (req->rq_rcv_buf.len < 12) {
1522                 if (!RPC_IS_SOFT(task)) {
1523                         task->tk_action = call_bind;
1524                         clnt->cl_stats->rpcretrans++;
1525                         goto out_retry;
1526                 }
1527                 dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
1528                                 clnt->cl_protname, task->tk_status);
1529                 task->tk_action = call_timeout;
1530                 goto out_retry;
1531         }
1532
1533         p = rpc_verify_header(task);
1534         if (IS_ERR(p)) {
1535                 if (p == ERR_PTR(-EAGAIN))
1536                         goto out_retry;
1537                 return;
1538         }
1539
1540         task->tk_action = rpc_exit_task;
1541
1542         if (decode) {
1543                 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
1544                                                       task->tk_msg.rpc_resp);
1545         }
1546         dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
1547                         task->tk_status);
1548         return;
1549 out_retry:
1550         task->tk_status = 0;
1551         /* Note: rpc_verify_header() may have freed the RPC slot */
1552         if (task->tk_rqstp == req) {
1553                 req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
1554                 if (task->tk_client->cl_discrtry)
1555                         xprt_conditional_disconnect(task->tk_xprt,
1556                                         req->rq_connect_cookie);
1557         }
1558 }
1559
1560 /*
1561  * 8.   Refresh the credentials if rejected by the server
1562  */
1563 static void
1564 call_refresh(struct rpc_task *task)
1565 {
1566         dprint_status(task);
1567
1568         task->tk_action = call_refreshresult;
1569         task->tk_status = 0;
1570         task->tk_client->cl_stats->rpcauthrefresh++;
1571         rpcauth_refreshcred(task);
1572 }
1573
1574 /*
1575  * 8a.  Process the results of a credential refresh
1576  */
1577 static void
1578 call_refreshresult(struct rpc_task *task)
1579 {
1580         int status = task->tk_status;
1581
1582         dprint_status(task);
1583
1584         task->tk_status = 0;
1585         task->tk_action = call_reserve;
1586         if (status >= 0 && rpcauth_uptodatecred(task))
1587                 return;
1588         if (status == -EACCES) {
1589                 rpc_exit(task, -EACCES);
1590                 return;
1591         }
1592         task->tk_action = call_refresh;
1593         if (status != -ETIMEDOUT)
1594                 rpc_delay(task, 3*HZ);
1595 }
1596
1597 static __be32 *
1598 rpc_encode_header(struct rpc_task *task)
1599 {
1600         struct rpc_clnt *clnt = task->tk_client;
1601         struct rpc_rqst *req = task->tk_rqstp;
1602         __be32          *p = req->rq_svec[0].iov_base;
1603
1604         /* FIXME: check buffer size? */
1605
1606         p = xprt_skip_transport_header(task->tk_xprt, p);
1607         *p++ = req->rq_xid;             /* XID */
1608         *p++ = htonl(RPC_CALL);         /* CALL */
1609         *p++ = htonl(RPC_VERSION);      /* RPC version */
1610         *p++ = htonl(clnt->cl_prog);    /* program number */
1611         *p++ = htonl(clnt->cl_vers);    /* program version */
1612         *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
1613         p = rpcauth_marshcred(task, p);
1614         req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
1615         return p;
1616 }
1617
1618 static __be32 *
1619 rpc_verify_header(struct rpc_task *task)
1620 {
1621         struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
1622         int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
1623         __be32  *p = iov->iov_base;
1624         u32 n;
1625         int error = -EACCES;
1626
1627         if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
1628                 /* RFC-1014 says that the representation of XDR data must be a
1629                  * multiple of four bytes
1630                  * - if it isn't pointer subtraction in the NFS client may give
1631                  *   undefined results
1632                  */
1633                 dprintk("RPC: %5u %s: XDR representation not a multiple of"
1634                        " 4 bytes: 0x%x\n", task->tk_pid, __func__,
1635                        task->tk_rqstp->rq_rcv_buf.len);
1636                 goto out_eio;
1637         }
1638         if ((len -= 3) < 0)
1639                 goto out_overflow;
1640
1641         p += 1; /* skip XID */
1642         if ((n = ntohl(*p++)) != RPC_REPLY) {
1643                 dprintk("RPC: %5u %s: not an RPC reply: %x\n",
1644                         task->tk_pid, __func__, n);
1645                 goto out_garbage;
1646         }
1647
1648         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
1649                 if (--len < 0)
1650                         goto out_overflow;
1651                 switch ((n = ntohl(*p++))) {
1652                         case RPC_AUTH_ERROR:
1653                                 break;
1654                         case RPC_MISMATCH:
1655                                 dprintk("RPC: %5u %s: RPC call version "
1656                                                 "mismatch!\n",
1657                                                 task->tk_pid, __func__);
1658                                 error = -EPROTONOSUPPORT;
1659                                 goto out_err;
1660                         default:
1661                                 dprintk("RPC: %5u %s: RPC call rejected, "
1662                                                 "unknown error: %x\n",
1663                                                 task->tk_pid, __func__, n);
1664                                 goto out_eio;
1665                 }
1666                 if (--len < 0)
1667                         goto out_overflow;
1668                 switch ((n = ntohl(*p++))) {
1669                 case RPC_AUTH_REJECTEDCRED:
1670                 case RPC_AUTH_REJECTEDVERF:
1671                 case RPCSEC_GSS_CREDPROBLEM:
1672                 case RPCSEC_GSS_CTXPROBLEM:
1673                         if (!task->tk_cred_retry)
1674                                 break;
1675                         task->tk_cred_retry--;
1676                         dprintk("RPC: %5u %s: retry stale creds\n",
1677                                         task->tk_pid, __func__);
1678                         rpcauth_invalcred(task);
1679                         /* Ensure we obtain a new XID! */
1680                         xprt_release(task);
1681                         task->tk_action = call_refresh;
1682                         goto out_retry;
1683                 case RPC_AUTH_BADCRED:
1684                 case RPC_AUTH_BADVERF:
1685                         /* possibly garbled cred/verf? */
1686                         if (!task->tk_garb_retry)
1687                                 break;
1688                         task->tk_garb_retry--;
1689                         dprintk("RPC: %5u %s: retry garbled creds\n",
1690                                         task->tk_pid, __func__);
1691                         task->tk_action = call_bind;
1692                         goto out_retry;
1693                 case RPC_AUTH_TOOWEAK:
1694                         printk(KERN_NOTICE "RPC: server %s requires stronger "
1695                                "authentication.\n", task->tk_client->cl_server);
1696                         break;
1697                 default:
1698                         dprintk("RPC: %5u %s: unknown auth error: %x\n",
1699                                         task->tk_pid, __func__, n);
1700                         error = -EIO;
1701                 }
1702                 dprintk("RPC: %5u %s: call rejected %d\n",
1703                                 task->tk_pid, __func__, n);
1704                 goto out_err;
1705         }
1706         if (!(p = rpcauth_checkverf(task, p))) {
1707                 dprintk("RPC: %5u %s: auth check failed\n",
1708                                 task->tk_pid, __func__);
1709                 goto out_garbage;               /* bad verifier, retry */
1710         }
1711         len = p - (__be32 *)iov->iov_base - 1;
1712         if (len < 0)
1713                 goto out_overflow;
1714         switch ((n = ntohl(*p++))) {
1715         case RPC_SUCCESS:
1716                 return p;
1717         case RPC_PROG_UNAVAIL:
1718                 dprintk("RPC: %5u %s: program %u is unsupported by server %s\n",
1719                                 task->tk_pid, __func__,
1720                                 (unsigned int)task->tk_client->cl_prog,
1721                                 task->tk_client->cl_server);
1722                 error = -EPFNOSUPPORT;
1723                 goto out_err;
1724         case RPC_PROG_MISMATCH:
1725                 dprintk("RPC: %5u %s: program %u, version %u unsupported by "
1726                                 "server %s\n", task->tk_pid, __func__,
1727                                 (unsigned int)task->tk_client->cl_prog,
1728                                 (unsigned int)task->tk_client->cl_vers,
1729                                 task->tk_client->cl_server);
1730                 error = -EPROTONOSUPPORT;
1731                 goto out_err;
1732         case RPC_PROC_UNAVAIL:
1733                 dprintk("RPC: %5u %s: proc %s unsupported by program %u, "
1734                                 "version %u on server %s\n",
1735                                 task->tk_pid, __func__,
1736                                 rpc_proc_name(task),
1737                                 task->tk_client->cl_prog,
1738                                 task->tk_client->cl_vers,
1739                                 task->tk_client->cl_server);
1740                 error = -EOPNOTSUPP;
1741                 goto out_err;
1742         case RPC_GARBAGE_ARGS:
1743                 dprintk("RPC: %5u %s: server saw garbage\n",
1744                                 task->tk_pid, __func__);
1745                 break;                  /* retry */
1746         default:
1747                 dprintk("RPC: %5u %s: server accept status: %x\n",
1748                                 task->tk_pid, __func__, n);
1749                 /* Also retry */
1750         }
1751
1752 out_garbage:
1753         task->tk_client->cl_stats->rpcgarbage++;
1754         if (task->tk_garb_retry) {
1755                 task->tk_garb_retry--;
1756                 dprintk("RPC: %5u %s: retrying\n",
1757                                 task->tk_pid, __func__);
1758                 task->tk_action = call_bind;
1759 out_retry:
1760                 return ERR_PTR(-EAGAIN);
1761         }
1762 out_eio:
1763         error = -EIO;
1764 out_err:
1765         rpc_exit(task, error);
1766         dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
1767                         __func__, error);
1768         return ERR_PTR(error);
1769 out_overflow:
1770         dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
1771                         __func__);
1772         goto out_garbage;
1773 }
1774
1775 static int rpcproc_encode_null(void *rqstp, __be32 *data, void *obj)
1776 {
1777         return 0;
1778 }
1779
1780 static int rpcproc_decode_null(void *rqstp, __be32 *data, void *obj)
1781 {
1782         return 0;
1783 }
1784
1785 static struct rpc_procinfo rpcproc_null = {
1786         .p_encode = rpcproc_encode_null,
1787         .p_decode = rpcproc_decode_null,
1788 };
1789
1790 static int rpc_ping(struct rpc_clnt *clnt)
1791 {
1792         struct rpc_message msg = {
1793                 .rpc_proc = &rpcproc_null,
1794         };
1795         int err;
1796         msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
1797         err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN);
1798         put_rpccred(msg.rpc_cred);
1799         return err;
1800 }
1801
1802 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
1803 {
1804         struct rpc_message msg = {
1805                 .rpc_proc = &rpcproc_null,
1806                 .rpc_cred = cred,
1807         };
1808         struct rpc_task_setup task_setup_data = {
1809                 .rpc_client = clnt,
1810                 .rpc_message = &msg,
1811                 .callback_ops = &rpc_default_ops,
1812                 .flags = flags,
1813         };
1814         return rpc_run_task(&task_setup_data);
1815 }
1816 EXPORT_SYMBOL_GPL(rpc_call_null);
1817
1818 #ifdef RPC_DEBUG
1819 static void rpc_show_header(void)
1820 {
1821         printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
1822                 "-timeout ---ops--\n");
1823 }
1824
1825 static void rpc_show_task(const struct rpc_clnt *clnt,
1826                           const struct rpc_task *task)
1827 {
1828         const char *rpc_waitq = "none";
1829         char *p, action[KSYM_SYMBOL_LEN];
1830
1831         if (RPC_IS_QUEUED(task))
1832                 rpc_waitq = rpc_qname(task->tk_waitqueue);
1833
1834         /* map tk_action pointer to a function name; then trim off
1835          * the "+0x0 [sunrpc]" */
1836         sprint_symbol(action, (unsigned long)task->tk_action);
1837         p = strchr(action, '+');
1838         if (p)
1839                 *p = '\0';
1840
1841         printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%s q:%s\n",
1842                 task->tk_pid, task->tk_flags, task->tk_status,
1843                 clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
1844                 clnt->cl_protname, clnt->cl_vers, rpc_proc_name(task),
1845                 action, rpc_waitq);
1846 }
1847
1848 void rpc_show_tasks(void)
1849 {
1850         struct rpc_clnt *clnt;
1851         struct rpc_task *task;
1852         int header = 0;
1853
1854         spin_lock(&rpc_client_lock);
1855         list_for_each_entry(clnt, &all_clients, cl_clients) {
1856                 spin_lock(&clnt->cl_lock);
1857                 list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
1858                         if (!header) {
1859                                 rpc_show_header();
1860                                 header++;
1861                         }
1862                         rpc_show_task(clnt, task);
1863                 }
1864                 spin_unlock(&clnt->cl_lock);
1865         }
1866         spin_unlock(&rpc_client_lock);
1867 }
1868 #endif