]> bbs.cooldavid.org Git - net-next-2.6.git/blob - net/sunrpc/xprtsock.c
net: Remove unnecessary returns from void function()s
[net-next-2.6.git] / net / sunrpc / xprtsock.c
1 /*
2  * linux/net/sunrpc/xprtsock.c
3  *
4  * Client-side transport implementation for sockets.
5  *
6  * TCP callback races fixes (C) 1998 Red Hat
7  * TCP send fixes (C) 1998 Red Hat
8  * TCP NFS related read + write fixes
9  *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  *
11  * Rewrite of larges part of the code in order to stabilize TCP stuff.
12  * Fix behaviour when socket buffer is full.
13  *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14  *
15  * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16  *
17  * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
18  *   <gilles.quillard@bull.net>
19  */
20
21 #include <linux/types.h>
22 #include <linux/slab.h>
23 #include <linux/module.h>
24 #include <linux/capability.h>
25 #include <linux/pagemap.h>
26 #include <linux/errno.h>
27 #include <linux/socket.h>
28 #include <linux/in.h>
29 #include <linux/net.h>
30 #include <linux/mm.h>
31 #include <linux/udp.h>
32 #include <linux/tcp.h>
33 #include <linux/sunrpc/clnt.h>
34 #include <linux/sunrpc/sched.h>
35 #include <linux/sunrpc/svcsock.h>
36 #include <linux/sunrpc/xprtsock.h>
37 #include <linux/file.h>
38 #ifdef CONFIG_NFS_V4_1
39 #include <linux/sunrpc/bc_xprt.h>
40 #endif
41
42 #include <net/sock.h>
43 #include <net/checksum.h>
44 #include <net/udp.h>
45 #include <net/tcp.h>
46
47 #include "sunrpc.h"
48 /*
49  * xprtsock tunables
50  */
51 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
52 unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
53
54 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
55 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
56
57 #define XS_TCP_LINGER_TO        (15U * HZ)
58 static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
59
60 /*
61  * We can register our own files under /proc/sys/sunrpc by
62  * calling register_sysctl_table() again.  The files in that
63  * directory become the union of all files registered there.
64  *
65  * We simply need to make sure that we don't collide with
66  * someone else's file names!
67  */
68
69 #ifdef RPC_DEBUG
70
71 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
72 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
73 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
74 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
75
76 static struct ctl_table_header *sunrpc_table_header;
77
78 /*
79  * FIXME: changing the UDP slot table size should also resize the UDP
80  *        socket buffers for existing UDP transports
81  */
82 static ctl_table xs_tunables_table[] = {
83         {
84                 .procname       = "udp_slot_table_entries",
85                 .data           = &xprt_udp_slot_table_entries,
86                 .maxlen         = sizeof(unsigned int),
87                 .mode           = 0644,
88                 .proc_handler   = proc_dointvec_minmax,
89                 .extra1         = &min_slot_table_size,
90                 .extra2         = &max_slot_table_size
91         },
92         {
93                 .procname       = "tcp_slot_table_entries",
94                 .data           = &xprt_tcp_slot_table_entries,
95                 .maxlen         = sizeof(unsigned int),
96                 .mode           = 0644,
97                 .proc_handler   = proc_dointvec_minmax,
98                 .extra1         = &min_slot_table_size,
99                 .extra2         = &max_slot_table_size
100         },
101         {
102                 .procname       = "min_resvport",
103                 .data           = &xprt_min_resvport,
104                 .maxlen         = sizeof(unsigned int),
105                 .mode           = 0644,
106                 .proc_handler   = proc_dointvec_minmax,
107                 .extra1         = &xprt_min_resvport_limit,
108                 .extra2         = &xprt_max_resvport_limit
109         },
110         {
111                 .procname       = "max_resvport",
112                 .data           = &xprt_max_resvport,
113                 .maxlen         = sizeof(unsigned int),
114                 .mode           = 0644,
115                 .proc_handler   = proc_dointvec_minmax,
116                 .extra1         = &xprt_min_resvport_limit,
117                 .extra2         = &xprt_max_resvport_limit
118         },
119         {
120                 .procname       = "tcp_fin_timeout",
121                 .data           = &xs_tcp_fin_timeout,
122                 .maxlen         = sizeof(xs_tcp_fin_timeout),
123                 .mode           = 0644,
124                 .proc_handler   = proc_dointvec_jiffies,
125         },
126         { },
127 };
128
129 static ctl_table sunrpc_table[] = {
130         {
131                 .procname       = "sunrpc",
132                 .mode           = 0555,
133                 .child          = xs_tunables_table
134         },
135         { },
136 };
137
138 #endif
139
140 /*
141  * Time out for an RPC UDP socket connect.  UDP socket connects are
142  * synchronous, but we set a timeout anyway in case of resource
143  * exhaustion on the local host.
144  */
145 #define XS_UDP_CONN_TO          (5U * HZ)
146
147 /*
148  * Wait duration for an RPC TCP connection to be established.  Solaris
149  * NFS over TCP uses 60 seconds, for example, which is in line with how
150  * long a server takes to reboot.
151  */
152 #define XS_TCP_CONN_TO          (60U * HZ)
153
154 /*
155  * Wait duration for a reply from the RPC portmapper.
156  */
157 #define XS_BIND_TO              (60U * HZ)
158
159 /*
160  * Delay if a UDP socket connect error occurs.  This is most likely some
161  * kind of resource problem on the local host.
162  */
163 #define XS_UDP_REEST_TO         (2U * HZ)
164
165 /*
166  * The reestablish timeout allows clients to delay for a bit before attempting
167  * to reconnect to a server that just dropped our connection.
168  *
169  * We implement an exponential backoff when trying to reestablish a TCP
170  * transport connection with the server.  Some servers like to drop a TCP
171  * connection when they are overworked, so we start with a short timeout and
172  * increase over time if the server is down or not responding.
173  */
174 #define XS_TCP_INIT_REEST_TO    (3U * HZ)
175 #define XS_TCP_MAX_REEST_TO     (5U * 60 * HZ)
176
177 /*
178  * TCP idle timeout; client drops the transport socket if it is idle
179  * for this long.  Note that we also timeout UDP sockets to prevent
180  * holding port numbers when there is no RPC traffic.
181  */
182 #define XS_IDLE_DISC_TO         (5U * 60 * HZ)
183
184 #ifdef RPC_DEBUG
185 # undef  RPC_DEBUG_DATA
186 # define RPCDBG_FACILITY        RPCDBG_TRANS
187 #endif
188
189 #ifdef RPC_DEBUG_DATA
190 static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
191 {
192         u8 *buf = (u8 *) packet;
193         int j;
194
195         dprintk("RPC:       %s\n", msg);
196         for (j = 0; j < count && j < 128; j += 4) {
197                 if (!(j & 31)) {
198                         if (j)
199                                 dprintk("\n");
200                         dprintk("0x%04x ", j);
201                 }
202                 dprintk("%02x%02x%02x%02x ",
203                         buf[j], buf[j+1], buf[j+2], buf[j+3]);
204         }
205         dprintk("\n");
206 }
207 #else
208 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
209 {
210         /* NOP */
211 }
212 #endif
213
214 struct sock_xprt {
215         struct rpc_xprt         xprt;
216
217         /*
218          * Network layer
219          */
220         struct socket *         sock;
221         struct sock *           inet;
222
223         /*
224          * State of TCP reply receive
225          */
226         __be32                  tcp_fraghdr,
227                                 tcp_xid;
228
229         u32                     tcp_offset,
230                                 tcp_reclen;
231
232         unsigned long           tcp_copied,
233                                 tcp_flags;
234
235         /*
236          * Connection of transports
237          */
238         struct delayed_work     connect_worker;
239         struct sockaddr_storage srcaddr;
240         unsigned short          srcport;
241
242         /*
243          * UDP socket buffer size parameters
244          */
245         size_t                  rcvsize,
246                                 sndsize;
247
248         /*
249          * Saved socket callback addresses
250          */
251         void                    (*old_data_ready)(struct sock *, int);
252         void                    (*old_state_change)(struct sock *);
253         void                    (*old_write_space)(struct sock *);
254         void                    (*old_error_report)(struct sock *);
255 };
256
257 /*
258  * TCP receive state flags
259  */
260 #define TCP_RCV_LAST_FRAG       (1UL << 0)
261 #define TCP_RCV_COPY_FRAGHDR    (1UL << 1)
262 #define TCP_RCV_COPY_XID        (1UL << 2)
263 #define TCP_RCV_COPY_DATA       (1UL << 3)
264 #define TCP_RCV_READ_CALLDIR    (1UL << 4)
265 #define TCP_RCV_COPY_CALLDIR    (1UL << 5)
266
267 /*
268  * TCP RPC flags
269  */
270 #define TCP_RPC_REPLY           (1UL << 6)
271
272 static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
273 {
274         return (struct sockaddr *) &xprt->addr;
275 }
276
277 static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
278 {
279         return (struct sockaddr_in *) &xprt->addr;
280 }
281
282 static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
283 {
284         return (struct sockaddr_in6 *) &xprt->addr;
285 }
286
287 static void xs_format_common_peer_addresses(struct rpc_xprt *xprt)
288 {
289         struct sockaddr *sap = xs_addr(xprt);
290         struct sockaddr_in6 *sin6;
291         struct sockaddr_in *sin;
292         char buf[128];
293
294         (void)rpc_ntop(sap, buf, sizeof(buf));
295         xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL);
296
297         switch (sap->sa_family) {
298         case AF_INET:
299                 sin = xs_addr_in(xprt);
300                 snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
301                 break;
302         case AF_INET6:
303                 sin6 = xs_addr_in6(xprt);
304                 snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
305                 break;
306         default:
307                 BUG();
308         }
309         xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
310 }
311
312 static void xs_format_common_peer_ports(struct rpc_xprt *xprt)
313 {
314         struct sockaddr *sap = xs_addr(xprt);
315         char buf[128];
316
317         snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
318         xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
319
320         snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
321         xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
322 }
323
324 static void xs_format_peer_addresses(struct rpc_xprt *xprt,
325                                      const char *protocol,
326                                      const char *netid)
327 {
328         xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
329         xprt->address_strings[RPC_DISPLAY_NETID] = netid;
330         xs_format_common_peer_addresses(xprt);
331         xs_format_common_peer_ports(xprt);
332 }
333
334 static void xs_update_peer_port(struct rpc_xprt *xprt)
335 {
336         kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
337         kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
338
339         xs_format_common_peer_ports(xprt);
340 }
341
342 static void xs_free_peer_addresses(struct rpc_xprt *xprt)
343 {
344         unsigned int i;
345
346         for (i = 0; i < RPC_DISPLAY_MAX; i++)
347                 switch (i) {
348                 case RPC_DISPLAY_PROTO:
349                 case RPC_DISPLAY_NETID:
350                         continue;
351                 default:
352                         kfree(xprt->address_strings[i]);
353                 }
354 }
355
356 #define XS_SENDMSG_FLAGS        (MSG_DONTWAIT | MSG_NOSIGNAL)
357
358 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
359 {
360         struct msghdr msg = {
361                 .msg_name       = addr,
362                 .msg_namelen    = addrlen,
363                 .msg_flags      = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
364         };
365         struct kvec iov = {
366                 .iov_base       = vec->iov_base + base,
367                 .iov_len        = vec->iov_len - base,
368         };
369
370         if (iov.iov_len != 0)
371                 return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
372         return kernel_sendmsg(sock, &msg, NULL, 0, 0);
373 }
374
375 static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
376 {
377         struct page **ppage;
378         unsigned int remainder;
379         int err, sent = 0;
380
381         remainder = xdr->page_len - base;
382         base += xdr->page_base;
383         ppage = xdr->pages + (base >> PAGE_SHIFT);
384         base &= ~PAGE_MASK;
385         for(;;) {
386                 unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
387                 int flags = XS_SENDMSG_FLAGS;
388
389                 remainder -= len;
390                 if (remainder != 0 || more)
391                         flags |= MSG_MORE;
392                 err = sock->ops->sendpage(sock, *ppage, base, len, flags);
393                 if (remainder == 0 || err != len)
394                         break;
395                 sent += err;
396                 ppage++;
397                 base = 0;
398         }
399         if (sent == 0)
400                 return err;
401         if (err > 0)
402                 sent += err;
403         return sent;
404 }
405
406 /**
407  * xs_sendpages - write pages directly to a socket
408  * @sock: socket to send on
409  * @addr: UDP only -- address of destination
410  * @addrlen: UDP only -- length of destination address
411  * @xdr: buffer containing this request
412  * @base: starting position in the buffer
413  *
414  */
415 static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
416 {
417         unsigned int remainder = xdr->len - base;
418         int err, sent = 0;
419
420         if (unlikely(!sock))
421                 return -ENOTSOCK;
422
423         clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
424         if (base != 0) {
425                 addr = NULL;
426                 addrlen = 0;
427         }
428
429         if (base < xdr->head[0].iov_len || addr != NULL) {
430                 unsigned int len = xdr->head[0].iov_len - base;
431                 remainder -= len;
432                 err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
433                 if (remainder == 0 || err != len)
434                         goto out;
435                 sent += err;
436                 base = 0;
437         } else
438                 base -= xdr->head[0].iov_len;
439
440         if (base < xdr->page_len) {
441                 unsigned int len = xdr->page_len - base;
442                 remainder -= len;
443                 err = xs_send_pagedata(sock, xdr, base, remainder != 0);
444                 if (remainder == 0 || err != len)
445                         goto out;
446                 sent += err;
447                 base = 0;
448         } else
449                 base -= xdr->page_len;
450
451         if (base >= xdr->tail[0].iov_len)
452                 return sent;
453         err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
454 out:
455         if (sent == 0)
456                 return err;
457         if (err > 0)
458                 sent += err;
459         return sent;
460 }
461
462 static void xs_nospace_callback(struct rpc_task *task)
463 {
464         struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
465
466         transport->inet->sk_write_pending--;
467         clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
468 }
469
470 /**
471  * xs_nospace - place task on wait queue if transmit was incomplete
472  * @task: task to put to sleep
473  *
474  */
475 static int xs_nospace(struct rpc_task *task)
476 {
477         struct rpc_rqst *req = task->tk_rqstp;
478         struct rpc_xprt *xprt = req->rq_xprt;
479         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
480         int ret = 0;
481
482         dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
483                         task->tk_pid, req->rq_slen - req->rq_bytes_sent,
484                         req->rq_slen);
485
486         /* Protect against races with write_space */
487         spin_lock_bh(&xprt->transport_lock);
488
489         /* Don't race with disconnect */
490         if (xprt_connected(xprt)) {
491                 if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
492                         ret = -EAGAIN;
493                         /*
494                          * Notify TCP that we're limited by the application
495                          * window size
496                          */
497                         set_bit(SOCK_NOSPACE, &transport->sock->flags);
498                         transport->inet->sk_write_pending++;
499                         /* ...and wait for more buffer space */
500                         xprt_wait_for_buffer_space(task, xs_nospace_callback);
501                 }
502         } else {
503                 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
504                 ret = -ENOTCONN;
505         }
506
507         spin_unlock_bh(&xprt->transport_lock);
508         return ret;
509 }
510
511 /**
512  * xs_udp_send_request - write an RPC request to a UDP socket
513  * @task: address of RPC task that manages the state of an RPC request
514  *
515  * Return values:
516  *        0:    The request has been sent
517  *   EAGAIN:    The socket was blocked, please call again later to
518  *              complete the request
519  * ENOTCONN:    Caller needs to invoke connect logic then call again
520  *    other:    Some other error occured, the request was not sent
521  */
522 static int xs_udp_send_request(struct rpc_task *task)
523 {
524         struct rpc_rqst *req = task->tk_rqstp;
525         struct rpc_xprt *xprt = req->rq_xprt;
526         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
527         struct xdr_buf *xdr = &req->rq_snd_buf;
528         int status;
529
530         xs_pktdump("packet data:",
531                                 req->rq_svec->iov_base,
532                                 req->rq_svec->iov_len);
533
534         if (!xprt_bound(xprt))
535                 return -ENOTCONN;
536         status = xs_sendpages(transport->sock,
537                               xs_addr(xprt),
538                               xprt->addrlen, xdr,
539                               req->rq_bytes_sent);
540
541         dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
542                         xdr->len - req->rq_bytes_sent, status);
543
544         if (status >= 0) {
545                 task->tk_bytes_sent += status;
546                 if (status >= req->rq_slen)
547                         return 0;
548                 /* Still some bytes left; set up for a retry later. */
549                 status = -EAGAIN;
550         }
551
552         switch (status) {
553         case -ENOTSOCK:
554                 status = -ENOTCONN;
555                 /* Should we call xs_close() here? */
556                 break;
557         case -EAGAIN:
558                 status = xs_nospace(task);
559                 break;
560         default:
561                 dprintk("RPC:       sendmsg returned unrecognized error %d\n",
562                         -status);
563         case -ENETUNREACH:
564         case -EPIPE:
565         case -ECONNREFUSED:
566                 /* When the server has died, an ICMP port unreachable message
567                  * prompts ECONNREFUSED. */
568                 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
569         }
570
571         return status;
572 }
573
574 /**
575  * xs_tcp_shutdown - gracefully shut down a TCP socket
576  * @xprt: transport
577  *
578  * Initiates a graceful shutdown of the TCP socket by calling the
579  * equivalent of shutdown(SHUT_WR);
580  */
581 static void xs_tcp_shutdown(struct rpc_xprt *xprt)
582 {
583         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
584         struct socket *sock = transport->sock;
585
586         if (sock != NULL)
587                 kernel_sock_shutdown(sock, SHUT_WR);
588 }
589
590 static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
591 {
592         u32 reclen = buf->len - sizeof(rpc_fraghdr);
593         rpc_fraghdr *base = buf->head[0].iov_base;
594         *base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
595 }
596
597 /**
598  * xs_tcp_send_request - write an RPC request to a TCP socket
599  * @task: address of RPC task that manages the state of an RPC request
600  *
601  * Return values:
602  *        0:    The request has been sent
603  *   EAGAIN:    The socket was blocked, please call again later to
604  *              complete the request
605  * ENOTCONN:    Caller needs to invoke connect logic then call again
606  *    other:    Some other error occured, the request was not sent
607  *
608  * XXX: In the case of soft timeouts, should we eventually give up
609  *      if sendmsg is not able to make progress?
610  */
611 static int xs_tcp_send_request(struct rpc_task *task)
612 {
613         struct rpc_rqst *req = task->tk_rqstp;
614         struct rpc_xprt *xprt = req->rq_xprt;
615         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
616         struct xdr_buf *xdr = &req->rq_snd_buf;
617         int status;
618
619         xs_encode_tcp_record_marker(&req->rq_snd_buf);
620
621         xs_pktdump("packet data:",
622                                 req->rq_svec->iov_base,
623                                 req->rq_svec->iov_len);
624
625         /* Continue transmitting the packet/record. We must be careful
626          * to cope with writespace callbacks arriving _after_ we have
627          * called sendmsg(). */
628         while (1) {
629                 status = xs_sendpages(transport->sock,
630                                         NULL, 0, xdr, req->rq_bytes_sent);
631
632                 dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
633                                 xdr->len - req->rq_bytes_sent, status);
634
635                 if (unlikely(status < 0))
636                         break;
637
638                 /* If we've sent the entire packet, immediately
639                  * reset the count of bytes sent. */
640                 req->rq_bytes_sent += status;
641                 task->tk_bytes_sent += status;
642                 if (likely(req->rq_bytes_sent >= req->rq_slen)) {
643                         req->rq_bytes_sent = 0;
644                         return 0;
645                 }
646
647                 if (status != 0)
648                         continue;
649                 status = -EAGAIN;
650                 break;
651         }
652
653         switch (status) {
654         case -ENOTSOCK:
655                 status = -ENOTCONN;
656                 /* Should we call xs_close() here? */
657                 break;
658         case -EAGAIN:
659                 status = xs_nospace(task);
660                 break;
661         default:
662                 dprintk("RPC:       sendmsg returned unrecognized error %d\n",
663                         -status);
664         case -ECONNRESET:
665         case -EPIPE:
666                 xs_tcp_shutdown(xprt);
667         case -ECONNREFUSED:
668         case -ENOTCONN:
669                 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
670         }
671
672         return status;
673 }
674
675 /**
676  * xs_tcp_release_xprt - clean up after a tcp transmission
677  * @xprt: transport
678  * @task: rpc task
679  *
680  * This cleans up if an error causes us to abort the transmission of a request.
681  * In this case, the socket may need to be reset in order to avoid confusing
682  * the server.
683  */
684 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
685 {
686         struct rpc_rqst *req;
687
688         if (task != xprt->snd_task)
689                 return;
690         if (task == NULL)
691                 goto out_release;
692         req = task->tk_rqstp;
693         if (req->rq_bytes_sent == 0)
694                 goto out_release;
695         if (req->rq_bytes_sent == req->rq_snd_buf.len)
696                 goto out_release;
697         set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
698 out_release:
699         xprt_release_xprt(xprt, task);
700 }
701
702 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
703 {
704         transport->old_data_ready = sk->sk_data_ready;
705         transport->old_state_change = sk->sk_state_change;
706         transport->old_write_space = sk->sk_write_space;
707         transport->old_error_report = sk->sk_error_report;
708 }
709
710 static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
711 {
712         sk->sk_data_ready = transport->old_data_ready;
713         sk->sk_state_change = transport->old_state_change;
714         sk->sk_write_space = transport->old_write_space;
715         sk->sk_error_report = transport->old_error_report;
716 }
717
718 static void xs_reset_transport(struct sock_xprt *transport)
719 {
720         struct socket *sock = transport->sock;
721         struct sock *sk = transport->inet;
722
723         if (sk == NULL)
724                 return;
725
726         write_lock_bh(&sk->sk_callback_lock);
727         transport->inet = NULL;
728         transport->sock = NULL;
729
730         sk->sk_user_data = NULL;
731
732         xs_restore_old_callbacks(transport, sk);
733         write_unlock_bh(&sk->sk_callback_lock);
734
735         sk->sk_no_check = 0;
736
737         sock_release(sock);
738 }
739
740 /**
741  * xs_close - close a socket
742  * @xprt: transport
743  *
744  * This is used when all requests are complete; ie, no DRC state remains
745  * on the server we want to save.
746  *
747  * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
748  * xs_reset_transport() zeroing the socket from underneath a writer.
749  */
750 static void xs_close(struct rpc_xprt *xprt)
751 {
752         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
753
754         dprintk("RPC:       xs_close xprt %p\n", xprt);
755
756         xs_reset_transport(transport);
757         xprt->reestablish_timeout = 0;
758
759         smp_mb__before_clear_bit();
760         clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
761         clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
762         clear_bit(XPRT_CLOSING, &xprt->state);
763         smp_mb__after_clear_bit();
764         xprt_disconnect_done(xprt);
765 }
766
767 static void xs_tcp_close(struct rpc_xprt *xprt)
768 {
769         if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
770                 xs_close(xprt);
771         else
772                 xs_tcp_shutdown(xprt);
773 }
774
775 /**
776  * xs_destroy - prepare to shutdown a transport
777  * @xprt: doomed transport
778  *
779  */
780 static void xs_destroy(struct rpc_xprt *xprt)
781 {
782         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
783
784         dprintk("RPC:       xs_destroy xprt %p\n", xprt);
785
786         cancel_rearming_delayed_work(&transport->connect_worker);
787
788         xs_close(xprt);
789         xs_free_peer_addresses(xprt);
790         kfree(xprt->slot);
791         kfree(xprt);
792         module_put(THIS_MODULE);
793 }
794
795 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
796 {
797         return (struct rpc_xprt *) sk->sk_user_data;
798 }
799
800 /**
801  * xs_udp_data_ready - "data ready" callback for UDP sockets
802  * @sk: socket with data to read
803  * @len: how much data to read
804  *
805  */
806 static void xs_udp_data_ready(struct sock *sk, int len)
807 {
808         struct rpc_task *task;
809         struct rpc_xprt *xprt;
810         struct rpc_rqst *rovr;
811         struct sk_buff *skb;
812         int err, repsize, copied;
813         u32 _xid;
814         __be32 *xp;
815
816         read_lock(&sk->sk_callback_lock);
817         dprintk("RPC:       xs_udp_data_ready...\n");
818         if (!(xprt = xprt_from_sock(sk)))
819                 goto out;
820
821         if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
822                 goto out;
823
824         if (xprt->shutdown)
825                 goto dropit;
826
827         repsize = skb->len - sizeof(struct udphdr);
828         if (repsize < 4) {
829                 dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
830                 goto dropit;
831         }
832
833         /* Copy the XID from the skb... */
834         xp = skb_header_pointer(skb, sizeof(struct udphdr),
835                                 sizeof(_xid), &_xid);
836         if (xp == NULL)
837                 goto dropit;
838
839         /* Look up and lock the request corresponding to the given XID */
840         spin_lock(&xprt->transport_lock);
841         rovr = xprt_lookup_rqst(xprt, *xp);
842         if (!rovr)
843                 goto out_unlock;
844         task = rovr->rq_task;
845
846         if ((copied = rovr->rq_private_buf.buflen) > repsize)
847                 copied = repsize;
848
849         /* Suck it into the iovec, verify checksum if not done by hw. */
850         if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
851                 UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
852                 goto out_unlock;
853         }
854
855         UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
856
857         /* Something worked... */
858         dst_confirm(skb_dst(skb));
859
860         xprt_adjust_cwnd(task, copied);
861         xprt_update_rtt(task);
862         xprt_complete_rqst(task, copied);
863
864  out_unlock:
865         spin_unlock(&xprt->transport_lock);
866  dropit:
867         skb_free_datagram(sk, skb);
868  out:
869         read_unlock(&sk->sk_callback_lock);
870 }
871
872 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
873 {
874         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
875         size_t len, used;
876         char *p;
877
878         p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
879         len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
880         used = xdr_skb_read_bits(desc, p, len);
881         transport->tcp_offset += used;
882         if (used != len)
883                 return;
884
885         transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
886         if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
887                 transport->tcp_flags |= TCP_RCV_LAST_FRAG;
888         else
889                 transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
890         transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
891
892         transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
893         transport->tcp_offset = 0;
894
895         /* Sanity check of the record length */
896         if (unlikely(transport->tcp_reclen < 8)) {
897                 dprintk("RPC:       invalid TCP record fragment length\n");
898                 xprt_force_disconnect(xprt);
899                 return;
900         }
901         dprintk("RPC:       reading TCP record fragment of length %d\n",
902                         transport->tcp_reclen);
903 }
904
905 static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
906 {
907         if (transport->tcp_offset == transport->tcp_reclen) {
908                 transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
909                 transport->tcp_offset = 0;
910                 if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
911                         transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
912                         transport->tcp_flags |= TCP_RCV_COPY_XID;
913                         transport->tcp_copied = 0;
914                 }
915         }
916 }
917
918 static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
919 {
920         size_t len, used;
921         char *p;
922
923         len = sizeof(transport->tcp_xid) - transport->tcp_offset;
924         dprintk("RPC:       reading XID (%Zu bytes)\n", len);
925         p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
926         used = xdr_skb_read_bits(desc, p, len);
927         transport->tcp_offset += used;
928         if (used != len)
929                 return;
930         transport->tcp_flags &= ~TCP_RCV_COPY_XID;
931         transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
932         transport->tcp_copied = 4;
933         dprintk("RPC:       reading %s XID %08x\n",
934                         (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
935                                                               : "request with",
936                         ntohl(transport->tcp_xid));
937         xs_tcp_check_fraghdr(transport);
938 }
939
940 static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
941                                        struct xdr_skb_reader *desc)
942 {
943         size_t len, used;
944         u32 offset;
945         __be32  calldir;
946
947         /*
948          * We want transport->tcp_offset to be 8 at the end of this routine
949          * (4 bytes for the xid and 4 bytes for the call/reply flag).
950          * When this function is called for the first time,
951          * transport->tcp_offset is 4 (after having already read the xid).
952          */
953         offset = transport->tcp_offset - sizeof(transport->tcp_xid);
954         len = sizeof(calldir) - offset;
955         dprintk("RPC:       reading CALL/REPLY flag (%Zu bytes)\n", len);
956         used = xdr_skb_read_bits(desc, &calldir, len);
957         transport->tcp_offset += used;
958         if (used != len)
959                 return;
960         transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
961         transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
962         transport->tcp_flags |= TCP_RCV_COPY_DATA;
963         /*
964          * We don't yet have the XDR buffer, so we will write the calldir
965          * out after we get the buffer from the 'struct rpc_rqst'
966          */
967         if (ntohl(calldir) == RPC_REPLY)
968                 transport->tcp_flags |= TCP_RPC_REPLY;
969         else
970                 transport->tcp_flags &= ~TCP_RPC_REPLY;
971         dprintk("RPC:       reading %s CALL/REPLY flag %08x\n",
972                         (transport->tcp_flags & TCP_RPC_REPLY) ?
973                                 "reply for" : "request with", calldir);
974         xs_tcp_check_fraghdr(transport);
975 }
976
977 static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
978                                      struct xdr_skb_reader *desc,
979                                      struct rpc_rqst *req)
980 {
981         struct sock_xprt *transport =
982                                 container_of(xprt, struct sock_xprt, xprt);
983         struct xdr_buf *rcvbuf;
984         size_t len;
985         ssize_t r;
986
987         rcvbuf = &req->rq_private_buf;
988
989         if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
990                 /*
991                  * Save the RPC direction in the XDR buffer
992                  */
993                 __be32  calldir = transport->tcp_flags & TCP_RPC_REPLY ?
994                                         htonl(RPC_REPLY) : 0;
995
996                 memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
997                         &calldir, sizeof(calldir));
998                 transport->tcp_copied += sizeof(calldir);
999                 transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
1000         }
1001
1002         len = desc->count;
1003         if (len > transport->tcp_reclen - transport->tcp_offset) {
1004                 struct xdr_skb_reader my_desc;
1005
1006                 len = transport->tcp_reclen - transport->tcp_offset;
1007                 memcpy(&my_desc, desc, sizeof(my_desc));
1008                 my_desc.count = len;
1009                 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1010                                           &my_desc, xdr_skb_read_bits);
1011                 desc->count -= r;
1012                 desc->offset += r;
1013         } else
1014                 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1015                                           desc, xdr_skb_read_bits);
1016
1017         if (r > 0) {
1018                 transport->tcp_copied += r;
1019                 transport->tcp_offset += r;
1020         }
1021         if (r != len) {
1022                 /* Error when copying to the receive buffer,
1023                  * usually because we weren't able to allocate
1024                  * additional buffer pages. All we can do now
1025                  * is turn off TCP_RCV_COPY_DATA, so the request
1026                  * will not receive any additional updates,
1027                  * and time out.
1028                  * Any remaining data from this record will
1029                  * be discarded.
1030                  */
1031                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1032                 dprintk("RPC:       XID %08x truncated request\n",
1033                                 ntohl(transport->tcp_xid));
1034                 dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
1035                                 "tcp_offset = %u, tcp_reclen = %u\n",
1036                                 xprt, transport->tcp_copied,
1037                                 transport->tcp_offset, transport->tcp_reclen);
1038                 return;
1039         }
1040
1041         dprintk("RPC:       XID %08x read %Zd bytes\n",
1042                         ntohl(transport->tcp_xid), r);
1043         dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
1044                         "tcp_reclen = %u\n", xprt, transport->tcp_copied,
1045                         transport->tcp_offset, transport->tcp_reclen);
1046
1047         if (transport->tcp_copied == req->rq_private_buf.buflen)
1048                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1049         else if (transport->tcp_offset == transport->tcp_reclen) {
1050                 if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
1051                         transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1052         }
1053 }
1054
1055 /*
1056  * Finds the request corresponding to the RPC xid and invokes the common
1057  * tcp read code to read the data.
1058  */
1059 static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1060                                     struct xdr_skb_reader *desc)
1061 {
1062         struct sock_xprt *transport =
1063                                 container_of(xprt, struct sock_xprt, xprt);
1064         struct rpc_rqst *req;
1065
1066         dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
1067
1068         /* Find and lock the request corresponding to this xid */
1069         spin_lock(&xprt->transport_lock);
1070         req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1071         if (!req) {
1072                 dprintk("RPC:       XID %08x request not found!\n",
1073                                 ntohl(transport->tcp_xid));
1074                 spin_unlock(&xprt->transport_lock);
1075                 return -1;
1076         }
1077
1078         xs_tcp_read_common(xprt, desc, req);
1079
1080         if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1081                 xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1082
1083         spin_unlock(&xprt->transport_lock);
1084         return 0;
1085 }
1086
1087 #if defined(CONFIG_NFS_V4_1)
1088 /*
1089  * Obtains an rpc_rqst previously allocated and invokes the common
1090  * tcp read code to read the data.  The result is placed in the callback
1091  * queue.
1092  * If we're unable to obtain the rpc_rqst we schedule the closing of the
1093  * connection and return -1.
1094  */
1095 static inline int xs_tcp_read_callback(struct rpc_xprt *xprt,
1096                                        struct xdr_skb_reader *desc)
1097 {
1098         struct sock_xprt *transport =
1099                                 container_of(xprt, struct sock_xprt, xprt);
1100         struct rpc_rqst *req;
1101
1102         req = xprt_alloc_bc_request(xprt);
1103         if (req == NULL) {
1104                 printk(KERN_WARNING "Callback slot table overflowed\n");
1105                 xprt_force_disconnect(xprt);
1106                 return -1;
1107         }
1108
1109         req->rq_xid = transport->tcp_xid;
1110         dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
1111         xs_tcp_read_common(xprt, desc, req);
1112
1113         if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) {
1114                 struct svc_serv *bc_serv = xprt->bc_serv;
1115
1116                 /*
1117                  * Add callback request to callback list.  The callback
1118                  * service sleeps on the sv_cb_waitq waiting for new
1119                  * requests.  Wake it up after adding enqueing the
1120                  * request.
1121                  */
1122                 dprintk("RPC:       add callback request to list\n");
1123                 spin_lock(&bc_serv->sv_cb_lock);
1124                 list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
1125                 spin_unlock(&bc_serv->sv_cb_lock);
1126                 wake_up(&bc_serv->sv_cb_waitq);
1127         }
1128
1129         req->rq_private_buf.len = transport->tcp_copied;
1130
1131         return 0;
1132 }
1133
1134 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1135                                         struct xdr_skb_reader *desc)
1136 {
1137         struct sock_xprt *transport =
1138                                 container_of(xprt, struct sock_xprt, xprt);
1139
1140         return (transport->tcp_flags & TCP_RPC_REPLY) ?
1141                 xs_tcp_read_reply(xprt, desc) :
1142                 xs_tcp_read_callback(xprt, desc);
1143 }
1144 #else
1145 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1146                                         struct xdr_skb_reader *desc)
1147 {
1148         return xs_tcp_read_reply(xprt, desc);
1149 }
1150 #endif /* CONFIG_NFS_V4_1 */
1151
1152 /*
1153  * Read data off the transport.  This can be either an RPC_CALL or an
1154  * RPC_REPLY.  Relay the processing to helper functions.
1155  */
1156 static void xs_tcp_read_data(struct rpc_xprt *xprt,
1157                                     struct xdr_skb_reader *desc)
1158 {
1159         struct sock_xprt *transport =
1160                                 container_of(xprt, struct sock_xprt, xprt);
1161
1162         if (_xs_tcp_read_data(xprt, desc) == 0)
1163                 xs_tcp_check_fraghdr(transport);
1164         else {
1165                 /*
1166                  * The transport_lock protects the request handling.
1167                  * There's no need to hold it to update the tcp_flags.
1168                  */
1169                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1170         }
1171 }
1172
1173 static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1174 {
1175         size_t len;
1176
1177         len = transport->tcp_reclen - transport->tcp_offset;
1178         if (len > desc->count)
1179                 len = desc->count;
1180         desc->count -= len;
1181         desc->offset += len;
1182         transport->tcp_offset += len;
1183         dprintk("RPC:       discarded %Zu bytes\n", len);
1184         xs_tcp_check_fraghdr(transport);
1185 }
1186
1187 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
1188 {
1189         struct rpc_xprt *xprt = rd_desc->arg.data;
1190         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1191         struct xdr_skb_reader desc = {
1192                 .skb    = skb,
1193                 .offset = offset,
1194                 .count  = len,
1195         };
1196
1197         dprintk("RPC:       xs_tcp_data_recv started\n");
1198         do {
1199                 /* Read in a new fragment marker if necessary */
1200                 /* Can we ever really expect to get completely empty fragments? */
1201                 if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
1202                         xs_tcp_read_fraghdr(xprt, &desc);
1203                         continue;
1204                 }
1205                 /* Read in the xid if necessary */
1206                 if (transport->tcp_flags & TCP_RCV_COPY_XID) {
1207                         xs_tcp_read_xid(transport, &desc);
1208                         continue;
1209                 }
1210                 /* Read in the call/reply flag */
1211                 if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
1212                         xs_tcp_read_calldir(transport, &desc);
1213                         continue;
1214                 }
1215                 /* Read in the request data */
1216                 if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1217                         xs_tcp_read_data(xprt, &desc);
1218                         continue;
1219                 }
1220                 /* Skip over any trailing bytes on short reads */
1221                 xs_tcp_read_discard(transport, &desc);
1222         } while (desc.count);
1223         dprintk("RPC:       xs_tcp_data_recv done\n");
1224         return len - desc.count;
1225 }
1226
1227 /**
1228  * xs_tcp_data_ready - "data ready" callback for TCP sockets
1229  * @sk: socket with data to read
1230  * @bytes: how much data to read
1231  *
1232  */
1233 static void xs_tcp_data_ready(struct sock *sk, int bytes)
1234 {
1235         struct rpc_xprt *xprt;
1236         read_descriptor_t rd_desc;
1237         int read;
1238
1239         dprintk("RPC:       xs_tcp_data_ready...\n");
1240
1241         read_lock(&sk->sk_callback_lock);
1242         if (!(xprt = xprt_from_sock(sk)))
1243                 goto out;
1244         if (xprt->shutdown)
1245                 goto out;
1246
1247         /* Any data means we had a useful conversation, so
1248          * the we don't need to delay the next reconnect
1249          */
1250         if (xprt->reestablish_timeout)
1251                 xprt->reestablish_timeout = 0;
1252
1253         /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1254         rd_desc.arg.data = xprt;
1255         do {
1256                 rd_desc.count = 65536;
1257                 read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1258         } while (read > 0);
1259 out:
1260         read_unlock(&sk->sk_callback_lock);
1261 }
1262
1263 /*
1264  * Do the equivalent of linger/linger2 handling for dealing with
1265  * broken servers that don't close the socket in a timely
1266  * fashion
1267  */
1268 static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt,
1269                 unsigned long timeout)
1270 {
1271         struct sock_xprt *transport;
1272
1273         if (xprt_test_and_set_connecting(xprt))
1274                 return;
1275         set_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1276         transport = container_of(xprt, struct sock_xprt, xprt);
1277         queue_delayed_work(rpciod_workqueue, &transport->connect_worker,
1278                            timeout);
1279 }
1280
1281 static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)
1282 {
1283         struct sock_xprt *transport;
1284
1285         transport = container_of(xprt, struct sock_xprt, xprt);
1286
1287         if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) ||
1288             !cancel_delayed_work(&transport->connect_worker))
1289                 return;
1290         clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1291         xprt_clear_connecting(xprt);
1292 }
1293
1294 static void xs_sock_mark_closed(struct rpc_xprt *xprt)
1295 {
1296         smp_mb__before_clear_bit();
1297         clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1298         clear_bit(XPRT_CLOSING, &xprt->state);
1299         smp_mb__after_clear_bit();
1300         /* Mark transport as closed and wake up all pending tasks */
1301         xprt_disconnect_done(xprt);
1302 }
1303
1304 /**
1305  * xs_tcp_state_change - callback to handle TCP socket state changes
1306  * @sk: socket whose state has changed
1307  *
1308  */
1309 static void xs_tcp_state_change(struct sock *sk)
1310 {
1311         struct rpc_xprt *xprt;
1312
1313         read_lock(&sk->sk_callback_lock);
1314         if (!(xprt = xprt_from_sock(sk)))
1315                 goto out;
1316         dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
1317         dprintk("RPC:       state %x conn %d dead %d zapped %d\n",
1318                         sk->sk_state, xprt_connected(xprt),
1319                         sock_flag(sk, SOCK_DEAD),
1320                         sock_flag(sk, SOCK_ZAPPED));
1321
1322         switch (sk->sk_state) {
1323         case TCP_ESTABLISHED:
1324                 spin_lock_bh(&xprt->transport_lock);
1325                 if (!xprt_test_and_set_connected(xprt)) {
1326                         struct sock_xprt *transport = container_of(xprt,
1327                                         struct sock_xprt, xprt);
1328
1329                         /* Reset TCP record info */
1330                         transport->tcp_offset = 0;
1331                         transport->tcp_reclen = 0;
1332                         transport->tcp_copied = 0;
1333                         transport->tcp_flags =
1334                                 TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
1335
1336                         xprt_wake_pending_tasks(xprt, -EAGAIN);
1337                 }
1338                 spin_unlock_bh(&xprt->transport_lock);
1339                 break;
1340         case TCP_FIN_WAIT1:
1341                 /* The client initiated a shutdown of the socket */
1342                 xprt->connect_cookie++;
1343                 xprt->reestablish_timeout = 0;
1344                 set_bit(XPRT_CLOSING, &xprt->state);
1345                 smp_mb__before_clear_bit();
1346                 clear_bit(XPRT_CONNECTED, &xprt->state);
1347                 clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1348                 smp_mb__after_clear_bit();
1349                 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1350                 break;
1351         case TCP_CLOSE_WAIT:
1352                 /* The server initiated a shutdown of the socket */
1353                 xprt_force_disconnect(xprt);
1354         case TCP_SYN_SENT:
1355                 xprt->connect_cookie++;
1356         case TCP_CLOSING:
1357                 /*
1358                  * If the server closed down the connection, make sure that
1359                  * we back off before reconnecting
1360                  */
1361                 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
1362                         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1363                 break;
1364         case TCP_LAST_ACK:
1365                 set_bit(XPRT_CLOSING, &xprt->state);
1366                 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1367                 smp_mb__before_clear_bit();
1368                 clear_bit(XPRT_CONNECTED, &xprt->state);
1369                 smp_mb__after_clear_bit();
1370                 break;
1371         case TCP_CLOSE:
1372                 xs_tcp_cancel_linger_timeout(xprt);
1373                 xs_sock_mark_closed(xprt);
1374         }
1375  out:
1376         read_unlock(&sk->sk_callback_lock);
1377 }
1378
1379 /**
1380  * xs_error_report - callback mainly for catching socket errors
1381  * @sk: socket
1382  */
1383 static void xs_error_report(struct sock *sk)
1384 {
1385         struct rpc_xprt *xprt;
1386
1387         read_lock(&sk->sk_callback_lock);
1388         if (!(xprt = xprt_from_sock(sk)))
1389                 goto out;
1390         dprintk("RPC:       %s client %p...\n"
1391                         "RPC:       error %d\n",
1392                         __func__, xprt, sk->sk_err);
1393         xprt_wake_pending_tasks(xprt, -EAGAIN);
1394 out:
1395         read_unlock(&sk->sk_callback_lock);
1396 }
1397
1398 static void xs_write_space(struct sock *sk)
1399 {
1400         struct socket *sock;
1401         struct rpc_xprt *xprt;
1402
1403         if (unlikely(!(sock = sk->sk_socket)))
1404                 return;
1405         clear_bit(SOCK_NOSPACE, &sock->flags);
1406
1407         if (unlikely(!(xprt = xprt_from_sock(sk))))
1408                 return;
1409         if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
1410                 return;
1411
1412         xprt_write_space(xprt);
1413 }
1414
1415 /**
1416  * xs_udp_write_space - callback invoked when socket buffer space
1417  *                             becomes available
1418  * @sk: socket whose state has changed
1419  *
1420  * Called when more output buffer space is available for this socket.
1421  * We try not to wake our writers until they can make "significant"
1422  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1423  * with a bunch of small requests.
1424  */
1425 static void xs_udp_write_space(struct sock *sk)
1426 {
1427         read_lock(&sk->sk_callback_lock);
1428
1429         /* from net/core/sock.c:sock_def_write_space */
1430         if (sock_writeable(sk))
1431                 xs_write_space(sk);
1432
1433         read_unlock(&sk->sk_callback_lock);
1434 }
1435
1436 /**
1437  * xs_tcp_write_space - callback invoked when socket buffer space
1438  *                             becomes available
1439  * @sk: socket whose state has changed
1440  *
1441  * Called when more output buffer space is available for this socket.
1442  * We try not to wake our writers until they can make "significant"
1443  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1444  * with a bunch of small requests.
1445  */
1446 static void xs_tcp_write_space(struct sock *sk)
1447 {
1448         read_lock(&sk->sk_callback_lock);
1449
1450         /* from net/core/stream.c:sk_stream_write_space */
1451         if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
1452                 xs_write_space(sk);
1453
1454         read_unlock(&sk->sk_callback_lock);
1455 }
1456
1457 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1458 {
1459         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1460         struct sock *sk = transport->inet;
1461
1462         if (transport->rcvsize) {
1463                 sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1464                 sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1465         }
1466         if (transport->sndsize) {
1467                 sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1468                 sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1469                 sk->sk_write_space(sk);
1470         }
1471 }
1472
1473 /**
1474  * xs_udp_set_buffer_size - set send and receive limits
1475  * @xprt: generic transport
1476  * @sndsize: requested size of send buffer, in bytes
1477  * @rcvsize: requested size of receive buffer, in bytes
1478  *
1479  * Set socket send and receive buffer size limits.
1480  */
1481 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1482 {
1483         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1484
1485         transport->sndsize = 0;
1486         if (sndsize)
1487                 transport->sndsize = sndsize + 1024;
1488         transport->rcvsize = 0;
1489         if (rcvsize)
1490                 transport->rcvsize = rcvsize + 1024;
1491
1492         xs_udp_do_set_buffer_size(xprt);
1493 }
1494
1495 /**
1496  * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1497  * @task: task that timed out
1498  *
1499  * Adjust the congestion window after a retransmit timeout has occurred.
1500  */
1501 static void xs_udp_timer(struct rpc_task *task)
1502 {
1503         xprt_adjust_cwnd(task, -ETIMEDOUT);
1504 }
1505
1506 static unsigned short xs_get_random_port(void)
1507 {
1508         unsigned short range = xprt_max_resvport - xprt_min_resvport;
1509         unsigned short rand = (unsigned short) net_random() % range;
1510         return rand + xprt_min_resvport;
1511 }
1512
1513 /**
1514  * xs_set_port - reset the port number in the remote endpoint address
1515  * @xprt: generic transport
1516  * @port: new port number
1517  *
1518  */
1519 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1520 {
1521         dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1522
1523         rpc_set_port(xs_addr(xprt), port);
1524         xs_update_peer_port(xprt);
1525 }
1526
1527 static unsigned short xs_get_srcport(struct sock_xprt *transport, struct socket *sock)
1528 {
1529         unsigned short port = transport->srcport;
1530
1531         if (port == 0 && transport->xprt.resvport)
1532                 port = xs_get_random_port();
1533         return port;
1534 }
1535
1536 static unsigned short xs_next_srcport(struct sock_xprt *transport, struct socket *sock, unsigned short port)
1537 {
1538         if (transport->srcport != 0)
1539                 transport->srcport = 0;
1540         if (!transport->xprt.resvport)
1541                 return 0;
1542         if (port <= xprt_min_resvport || port > xprt_max_resvport)
1543                 return xprt_max_resvport;
1544         return --port;
1545 }
1546
1547 static int xs_bind4(struct sock_xprt *transport, struct socket *sock)
1548 {
1549         struct sockaddr_in myaddr = {
1550                 .sin_family = AF_INET,
1551         };
1552         struct sockaddr_in *sa;
1553         int err, nloop = 0;
1554         unsigned short port = xs_get_srcport(transport, sock);
1555         unsigned short last;
1556
1557         sa = (struct sockaddr_in *)&transport->srcaddr;
1558         myaddr.sin_addr = sa->sin_addr;
1559         do {
1560                 myaddr.sin_port = htons(port);
1561                 err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1562                                                 sizeof(myaddr));
1563                 if (port == 0)
1564                         break;
1565                 if (err == 0) {
1566                         transport->srcport = port;
1567                         break;
1568                 }
1569                 last = port;
1570                 port = xs_next_srcport(transport, sock, port);
1571                 if (port > last)
1572                         nloop++;
1573         } while (err == -EADDRINUSE && nloop != 2);
1574         dprintk("RPC:       %s %pI4:%u: %s (%d)\n",
1575                         __func__, &myaddr.sin_addr,
1576                         port, err ? "failed" : "ok", err);
1577         return err;
1578 }
1579
1580 static int xs_bind6(struct sock_xprt *transport, struct socket *sock)
1581 {
1582         struct sockaddr_in6 myaddr = {
1583                 .sin6_family = AF_INET6,
1584         };
1585         struct sockaddr_in6 *sa;
1586         int err, nloop = 0;
1587         unsigned short port = xs_get_srcport(transport, sock);
1588         unsigned short last;
1589
1590         sa = (struct sockaddr_in6 *)&transport->srcaddr;
1591         myaddr.sin6_addr = sa->sin6_addr;
1592         do {
1593                 myaddr.sin6_port = htons(port);
1594                 err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1595                                                 sizeof(myaddr));
1596                 if (port == 0)
1597                         break;
1598                 if (err == 0) {
1599                         transport->srcport = port;
1600                         break;
1601                 }
1602                 last = port;
1603                 port = xs_next_srcport(transport, sock, port);
1604                 if (port > last)
1605                         nloop++;
1606         } while (err == -EADDRINUSE && nloop != 2);
1607         dprintk("RPC:       xs_bind6 %pI6:%u: %s (%d)\n",
1608                 &myaddr.sin6_addr, port, err ? "failed" : "ok", err);
1609         return err;
1610 }
1611
1612 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1613 static struct lock_class_key xs_key[2];
1614 static struct lock_class_key xs_slock_key[2];
1615
1616 static inline void xs_reclassify_socket4(struct socket *sock)
1617 {
1618         struct sock *sk = sock->sk;
1619
1620         BUG_ON(sock_owned_by_user(sk));
1621         sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
1622                 &xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
1623 }
1624
1625 static inline void xs_reclassify_socket6(struct socket *sock)
1626 {
1627         struct sock *sk = sock->sk;
1628
1629         BUG_ON(sock_owned_by_user(sk));
1630         sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
1631                 &xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
1632 }
1633 #else
1634 static inline void xs_reclassify_socket4(struct socket *sock)
1635 {
1636 }
1637
1638 static inline void xs_reclassify_socket6(struct socket *sock)
1639 {
1640 }
1641 #endif
1642
1643 static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1644 {
1645         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1646
1647         if (!transport->inet) {
1648                 struct sock *sk = sock->sk;
1649
1650                 write_lock_bh(&sk->sk_callback_lock);
1651
1652                 xs_save_old_callbacks(transport, sk);
1653
1654                 sk->sk_user_data = xprt;
1655                 sk->sk_data_ready = xs_udp_data_ready;
1656                 sk->sk_write_space = xs_udp_write_space;
1657                 sk->sk_error_report = xs_error_report;
1658                 sk->sk_no_check = UDP_CSUM_NORCV;
1659                 sk->sk_allocation = GFP_ATOMIC;
1660
1661                 xprt_set_connected(xprt);
1662
1663                 /* Reset to new socket */
1664                 transport->sock = sock;
1665                 transport->inet = sk;
1666
1667                 write_unlock_bh(&sk->sk_callback_lock);
1668         }
1669         xs_udp_do_set_buffer_size(xprt);
1670 }
1671
1672 /**
1673  * xs_udp_connect_worker4 - set up a UDP socket
1674  * @work: RPC transport to connect
1675  *
1676  * Invoked by a work queue tasklet.
1677  */
1678 static void xs_udp_connect_worker4(struct work_struct *work)
1679 {
1680         struct sock_xprt *transport =
1681                 container_of(work, struct sock_xprt, connect_worker.work);
1682         struct rpc_xprt *xprt = &transport->xprt;
1683         struct socket *sock = transport->sock;
1684         int err, status = -EIO;
1685
1686         if (xprt->shutdown)
1687                 goto out;
1688
1689         /* Start by resetting any existing state */
1690         xs_reset_transport(transport);
1691
1692         err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock);
1693         if (err < 0) {
1694                 dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1695                 goto out;
1696         }
1697         xs_reclassify_socket4(sock);
1698
1699         if (xs_bind4(transport, sock)) {
1700                 sock_release(sock);
1701                 goto out;
1702         }
1703
1704         dprintk("RPC:       worker connecting xprt %p via %s to "
1705                                 "%s (port %s)\n", xprt,
1706                         xprt->address_strings[RPC_DISPLAY_PROTO],
1707                         xprt->address_strings[RPC_DISPLAY_ADDR],
1708                         xprt->address_strings[RPC_DISPLAY_PORT]);
1709
1710         xs_udp_finish_connecting(xprt, sock);
1711         status = 0;
1712 out:
1713         xprt_clear_connecting(xprt);
1714         xprt_wake_pending_tasks(xprt, status);
1715 }
1716
1717 /**
1718  * xs_udp_connect_worker6 - set up a UDP socket
1719  * @work: RPC transport to connect
1720  *
1721  * Invoked by a work queue tasklet.
1722  */
1723 static void xs_udp_connect_worker6(struct work_struct *work)
1724 {
1725         struct sock_xprt *transport =
1726                 container_of(work, struct sock_xprt, connect_worker.work);
1727         struct rpc_xprt *xprt = &transport->xprt;
1728         struct socket *sock = transport->sock;
1729         int err, status = -EIO;
1730
1731         if (xprt->shutdown)
1732                 goto out;
1733
1734         /* Start by resetting any existing state */
1735         xs_reset_transport(transport);
1736
1737         err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock);
1738         if (err < 0) {
1739                 dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1740                 goto out;
1741         }
1742         xs_reclassify_socket6(sock);
1743
1744         if (xs_bind6(transport, sock) < 0) {
1745                 sock_release(sock);
1746                 goto out;
1747         }
1748
1749         dprintk("RPC:       worker connecting xprt %p via %s to "
1750                                 "%s (port %s)\n", xprt,
1751                         xprt->address_strings[RPC_DISPLAY_PROTO],
1752                         xprt->address_strings[RPC_DISPLAY_ADDR],
1753                         xprt->address_strings[RPC_DISPLAY_PORT]);
1754
1755         xs_udp_finish_connecting(xprt, sock);
1756         status = 0;
1757 out:
1758         xprt_clear_connecting(xprt);
1759         xprt_wake_pending_tasks(xprt, status);
1760 }
1761
1762 /*
1763  * We need to preserve the port number so the reply cache on the server can
1764  * find our cached RPC replies when we get around to reconnecting.
1765  */
1766 static void xs_abort_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
1767 {
1768         int result;
1769         struct sockaddr any;
1770
1771         dprintk("RPC:       disconnecting xprt %p to reuse port\n", xprt);
1772
1773         /*
1774          * Disconnect the transport socket by doing a connect operation
1775          * with AF_UNSPEC.  This should return immediately...
1776          */
1777         memset(&any, 0, sizeof(any));
1778         any.sa_family = AF_UNSPEC;
1779         result = kernel_connect(transport->sock, &any, sizeof(any), 0);
1780         if (!result)
1781                 xs_sock_mark_closed(xprt);
1782         else
1783                 dprintk("RPC:       AF_UNSPEC connect return code %d\n",
1784                                 result);
1785 }
1786
1787 static void xs_tcp_reuse_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
1788 {
1789         unsigned int state = transport->inet->sk_state;
1790
1791         if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED)
1792                 return;
1793         if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT))
1794                 return;
1795         xs_abort_connection(xprt, transport);
1796 }
1797
1798 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1799 {
1800         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1801
1802         if (!transport->inet) {
1803                 struct sock *sk = sock->sk;
1804
1805                 write_lock_bh(&sk->sk_callback_lock);
1806
1807                 xs_save_old_callbacks(transport, sk);
1808
1809                 sk->sk_user_data = xprt;
1810                 sk->sk_data_ready = xs_tcp_data_ready;
1811                 sk->sk_state_change = xs_tcp_state_change;
1812                 sk->sk_write_space = xs_tcp_write_space;
1813                 sk->sk_error_report = xs_error_report;
1814                 sk->sk_allocation = GFP_ATOMIC;
1815
1816                 /* socket options */
1817                 sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
1818                 sock_reset_flag(sk, SOCK_LINGER);
1819                 tcp_sk(sk)->linger2 = 0;
1820                 tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1821
1822                 xprt_clear_connected(xprt);
1823
1824                 /* Reset to new socket */
1825                 transport->sock = sock;
1826                 transport->inet = sk;
1827
1828                 write_unlock_bh(&sk->sk_callback_lock);
1829         }
1830
1831         if (!xprt_bound(xprt))
1832                 return -ENOTCONN;
1833
1834         /* Tell the socket layer to start connecting... */
1835         xprt->stat.connect_count++;
1836         xprt->stat.connect_start = jiffies;
1837         return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
1838 }
1839
1840 /**
1841  * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
1842  * @xprt: RPC transport to connect
1843  * @transport: socket transport to connect
1844  * @create_sock: function to create a socket of the correct type
1845  *
1846  * Invoked by a work queue tasklet.
1847  */
1848 static void xs_tcp_setup_socket(struct rpc_xprt *xprt,
1849                 struct sock_xprt *transport,
1850                 struct socket *(*create_sock)(struct rpc_xprt *,
1851                         struct sock_xprt *))
1852 {
1853         struct socket *sock = transport->sock;
1854         int status = -EIO;
1855
1856         if (xprt->shutdown)
1857                 goto out;
1858
1859         if (!sock) {
1860                 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1861                 sock = create_sock(xprt, transport);
1862                 if (IS_ERR(sock)) {
1863                         status = PTR_ERR(sock);
1864                         goto out;
1865                 }
1866         } else {
1867                 int abort_and_exit;
1868
1869                 abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
1870                                 &xprt->state);
1871                 /* "close" the socket, preserving the local port */
1872                 xs_tcp_reuse_connection(xprt, transport);
1873
1874                 if (abort_and_exit)
1875                         goto out_eagain;
1876         }
1877
1878         dprintk("RPC:       worker connecting xprt %p via %s to "
1879                                 "%s (port %s)\n", xprt,
1880                         xprt->address_strings[RPC_DISPLAY_PROTO],
1881                         xprt->address_strings[RPC_DISPLAY_ADDR],
1882                         xprt->address_strings[RPC_DISPLAY_PORT]);
1883
1884         status = xs_tcp_finish_connecting(xprt, sock);
1885         dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
1886                         xprt, -status, xprt_connected(xprt),
1887                         sock->sk->sk_state);
1888         switch (status) {
1889         default:
1890                 printk("%s: connect returned unhandled error %d\n",
1891                         __func__, status);
1892         case -EADDRNOTAVAIL:
1893                 /* We're probably in TIME_WAIT. Get rid of existing socket,
1894                  * and retry
1895                  */
1896                 set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
1897                 xprt_force_disconnect(xprt);
1898                 break;
1899         case -ECONNREFUSED:
1900         case -ECONNRESET:
1901         case -ENETUNREACH:
1902                 /* retry with existing socket, after a delay */
1903         case 0:
1904         case -EINPROGRESS:
1905         case -EALREADY:
1906                 xprt_clear_connecting(xprt);
1907                 return;
1908         case -EINVAL:
1909                 /* Happens, for instance, if the user specified a link
1910                  * local IPv6 address without a scope-id.
1911                  */
1912                 goto out;
1913         }
1914 out_eagain:
1915         status = -EAGAIN;
1916 out:
1917         xprt_clear_connecting(xprt);
1918         xprt_wake_pending_tasks(xprt, status);
1919 }
1920
1921 static struct socket *xs_create_tcp_sock4(struct rpc_xprt *xprt,
1922                 struct sock_xprt *transport)
1923 {
1924         struct socket *sock;
1925         int err;
1926
1927         /* start from scratch */
1928         err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
1929         if (err < 0) {
1930                 dprintk("RPC:       can't create TCP transport socket (%d).\n",
1931                                 -err);
1932                 goto out_err;
1933         }
1934         xs_reclassify_socket4(sock);
1935
1936         if (xs_bind4(transport, sock) < 0) {
1937                 sock_release(sock);
1938                 goto out_err;
1939         }
1940         return sock;
1941 out_err:
1942         return ERR_PTR(-EIO);
1943 }
1944
1945 /**
1946  * xs_tcp_connect_worker4 - connect a TCP socket to a remote endpoint
1947  * @work: RPC transport to connect
1948  *
1949  * Invoked by a work queue tasklet.
1950  */
1951 static void xs_tcp_connect_worker4(struct work_struct *work)
1952 {
1953         struct sock_xprt *transport =
1954                 container_of(work, struct sock_xprt, connect_worker.work);
1955         struct rpc_xprt *xprt = &transport->xprt;
1956
1957         xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock4);
1958 }
1959
1960 static struct socket *xs_create_tcp_sock6(struct rpc_xprt *xprt,
1961                 struct sock_xprt *transport)
1962 {
1963         struct socket *sock;
1964         int err;
1965
1966         /* start from scratch */
1967         err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock);
1968         if (err < 0) {
1969                 dprintk("RPC:       can't create TCP transport socket (%d).\n",
1970                                 -err);
1971                 goto out_err;
1972         }
1973         xs_reclassify_socket6(sock);
1974
1975         if (xs_bind6(transport, sock) < 0) {
1976                 sock_release(sock);
1977                 goto out_err;
1978         }
1979         return sock;
1980 out_err:
1981         return ERR_PTR(-EIO);
1982 }
1983
1984 /**
1985  * xs_tcp_connect_worker6 - connect a TCP socket to a remote endpoint
1986  * @work: RPC transport to connect
1987  *
1988  * Invoked by a work queue tasklet.
1989  */
1990 static void xs_tcp_connect_worker6(struct work_struct *work)
1991 {
1992         struct sock_xprt *transport =
1993                 container_of(work, struct sock_xprt, connect_worker.work);
1994         struct rpc_xprt *xprt = &transport->xprt;
1995
1996         xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock6);
1997 }
1998
1999 /**
2000  * xs_connect - connect a socket to a remote endpoint
2001  * @task: address of RPC task that manages state of connect request
2002  *
2003  * TCP: If the remote end dropped the connection, delay reconnecting.
2004  *
2005  * UDP socket connects are synchronous, but we use a work queue anyway
2006  * to guarantee that even unprivileged user processes can set up a
2007  * socket on a privileged port.
2008  *
2009  * If a UDP socket connect fails, the delay behavior here prevents
2010  * retry floods (hard mounts).
2011  */
2012 static void xs_connect(struct rpc_task *task)
2013 {
2014         struct rpc_xprt *xprt = task->tk_xprt;
2015         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2016
2017         if (xprt_test_and_set_connecting(xprt))
2018                 return;
2019
2020         if (transport->sock != NULL && !RPC_IS_SOFTCONN(task)) {
2021                 dprintk("RPC:       xs_connect delayed xprt %p for %lu "
2022                                 "seconds\n",
2023                                 xprt, xprt->reestablish_timeout / HZ);
2024                 queue_delayed_work(rpciod_workqueue,
2025                                    &transport->connect_worker,
2026                                    xprt->reestablish_timeout);
2027                 xprt->reestablish_timeout <<= 1;
2028                 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2029                         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2030                 if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
2031                         xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
2032         } else {
2033                 dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
2034                 queue_delayed_work(rpciod_workqueue,
2035                                    &transport->connect_worker, 0);
2036         }
2037 }
2038
2039 static void xs_tcp_connect(struct rpc_task *task)
2040 {
2041         struct rpc_xprt *xprt = task->tk_xprt;
2042
2043         /* Exit if we need to wait for socket shutdown to complete */
2044         if (test_bit(XPRT_CLOSING, &xprt->state))
2045                 return;
2046         xs_connect(task);
2047 }
2048
2049 /**
2050  * xs_udp_print_stats - display UDP socket-specifc stats
2051  * @xprt: rpc_xprt struct containing statistics
2052  * @seq: output file
2053  *
2054  */
2055 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2056 {
2057         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2058
2059         seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
2060                         transport->srcport,
2061                         xprt->stat.bind_count,
2062                         xprt->stat.sends,
2063                         xprt->stat.recvs,
2064                         xprt->stat.bad_xids,
2065                         xprt->stat.req_u,
2066                         xprt->stat.bklog_u);
2067 }
2068
2069 /**
2070  * xs_tcp_print_stats - display TCP socket-specifc stats
2071  * @xprt: rpc_xprt struct containing statistics
2072  * @seq: output file
2073  *
2074  */
2075 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2076 {
2077         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2078         long idle_time = 0;
2079
2080         if (xprt_connected(xprt))
2081                 idle_time = (long)(jiffies - xprt->last_used) / HZ;
2082
2083         seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
2084                         transport->srcport,
2085                         xprt->stat.bind_count,
2086                         xprt->stat.connect_count,
2087                         xprt->stat.connect_time,
2088                         idle_time,
2089                         xprt->stat.sends,
2090                         xprt->stat.recvs,
2091                         xprt->stat.bad_xids,
2092                         xprt->stat.req_u,
2093                         xprt->stat.bklog_u);
2094 }
2095
2096 /*
2097  * Allocate a bunch of pages for a scratch buffer for the rpc code. The reason
2098  * we allocate pages instead doing a kmalloc like rpc_malloc is because we want
2099  * to use the server side send routines.
2100  */
2101 static void *bc_malloc(struct rpc_task *task, size_t size)
2102 {
2103         struct page *page;
2104         struct rpc_buffer *buf;
2105
2106         BUG_ON(size > PAGE_SIZE - sizeof(struct rpc_buffer));
2107         page = alloc_page(GFP_KERNEL);
2108
2109         if (!page)
2110                 return NULL;
2111
2112         buf = page_address(page);
2113         buf->len = PAGE_SIZE;
2114
2115         return buf->data;
2116 }
2117
2118 /*
2119  * Free the space allocated in the bc_alloc routine
2120  */
2121 static void bc_free(void *buffer)
2122 {
2123         struct rpc_buffer *buf;
2124
2125         if (!buffer)
2126                 return;
2127
2128         buf = container_of(buffer, struct rpc_buffer, data);
2129         free_page((unsigned long)buf);
2130 }
2131
2132 /*
2133  * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex
2134  * held. Borrows heavily from svc_tcp_sendto and xs_tcp_send_request.
2135  */
2136 static int bc_sendto(struct rpc_rqst *req)
2137 {
2138         int len;
2139         struct xdr_buf *xbufp = &req->rq_snd_buf;
2140         struct rpc_xprt *xprt = req->rq_xprt;
2141         struct sock_xprt *transport =
2142                                 container_of(xprt, struct sock_xprt, xprt);
2143         struct socket *sock = transport->sock;
2144         unsigned long headoff;
2145         unsigned long tailoff;
2146
2147         /*
2148          * Set up the rpc header and record marker stuff
2149          */
2150         xs_encode_tcp_record_marker(xbufp);
2151
2152         tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK;
2153         headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK;
2154         len = svc_send_common(sock, xbufp,
2155                               virt_to_page(xbufp->head[0].iov_base), headoff,
2156                               xbufp->tail[0].iov_base, tailoff);
2157
2158         if (len != xbufp->len) {
2159                 printk(KERN_NOTICE "Error sending entire callback!\n");
2160                 len = -EAGAIN;
2161         }
2162
2163         return len;
2164 }
2165
2166 /*
2167  * The send routine. Borrows from svc_send
2168  */
2169 static int bc_send_request(struct rpc_task *task)
2170 {
2171         struct rpc_rqst *req = task->tk_rqstp;
2172         struct svc_xprt *xprt;
2173         struct svc_sock         *svsk;
2174         u32                     len;
2175
2176         dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid));
2177         /*
2178          * Get the server socket associated with this callback xprt
2179          */
2180         xprt = req->rq_xprt->bc_xprt;
2181         svsk = container_of(xprt, struct svc_sock, sk_xprt);
2182
2183         /*
2184          * Grab the mutex to serialize data as the connection is shared
2185          * with the fore channel
2186          */
2187         if (!mutex_trylock(&xprt->xpt_mutex)) {
2188                 rpc_sleep_on(&xprt->xpt_bc_pending, task, NULL);
2189                 if (!mutex_trylock(&xprt->xpt_mutex))
2190                         return -EAGAIN;
2191                 rpc_wake_up_queued_task(&xprt->xpt_bc_pending, task);
2192         }
2193         if (test_bit(XPT_DEAD, &xprt->xpt_flags))
2194                 len = -ENOTCONN;
2195         else
2196                 len = bc_sendto(req);
2197         mutex_unlock(&xprt->xpt_mutex);
2198
2199         if (len > 0)
2200                 len = 0;
2201
2202         return len;
2203 }
2204
2205 /*
2206  * The close routine. Since this is client initiated, we do nothing
2207  */
2208
2209 static void bc_close(struct rpc_xprt *xprt)
2210 {
2211 }
2212
2213 /*
2214  * The xprt destroy routine. Again, because this connection is client
2215  * initiated, we do nothing
2216  */
2217
2218 static void bc_destroy(struct rpc_xprt *xprt)
2219 {
2220 }
2221
2222 static struct rpc_xprt_ops xs_udp_ops = {
2223         .set_buffer_size        = xs_udp_set_buffer_size,
2224         .reserve_xprt           = xprt_reserve_xprt_cong,
2225         .release_xprt           = xprt_release_xprt_cong,
2226         .rpcbind                = rpcb_getport_async,
2227         .set_port               = xs_set_port,
2228         .connect                = xs_connect,
2229         .buf_alloc              = rpc_malloc,
2230         .buf_free               = rpc_free,
2231         .send_request           = xs_udp_send_request,
2232         .set_retrans_timeout    = xprt_set_retrans_timeout_rtt,
2233         .timer                  = xs_udp_timer,
2234         .release_request        = xprt_release_rqst_cong,
2235         .close                  = xs_close,
2236         .destroy                = xs_destroy,
2237         .print_stats            = xs_udp_print_stats,
2238 };
2239
2240 static struct rpc_xprt_ops xs_tcp_ops = {
2241         .reserve_xprt           = xprt_reserve_xprt,
2242         .release_xprt           = xs_tcp_release_xprt,
2243         .rpcbind                = rpcb_getport_async,
2244         .set_port               = xs_set_port,
2245         .connect                = xs_tcp_connect,
2246         .buf_alloc              = rpc_malloc,
2247         .buf_free               = rpc_free,
2248         .send_request           = xs_tcp_send_request,
2249         .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2250         .close                  = xs_tcp_close,
2251         .destroy                = xs_destroy,
2252         .print_stats            = xs_tcp_print_stats,
2253 };
2254
2255 /*
2256  * The rpc_xprt_ops for the server backchannel
2257  */
2258
2259 static struct rpc_xprt_ops bc_tcp_ops = {
2260         .reserve_xprt           = xprt_reserve_xprt,
2261         .release_xprt           = xprt_release_xprt,
2262         .buf_alloc              = bc_malloc,
2263         .buf_free               = bc_free,
2264         .send_request           = bc_send_request,
2265         .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2266         .close                  = bc_close,
2267         .destroy                = bc_destroy,
2268         .print_stats            = xs_tcp_print_stats,
2269 };
2270
2271 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
2272                                       unsigned int slot_table_size)
2273 {
2274         struct rpc_xprt *xprt;
2275         struct sock_xprt *new;
2276
2277         if (args->addrlen > sizeof(xprt->addr)) {
2278                 dprintk("RPC:       xs_setup_xprt: address too large\n");
2279                 return ERR_PTR(-EBADF);
2280         }
2281
2282         new = kzalloc(sizeof(*new), GFP_KERNEL);
2283         if (new == NULL) {
2284                 dprintk("RPC:       xs_setup_xprt: couldn't allocate "
2285                                 "rpc_xprt\n");
2286                 return ERR_PTR(-ENOMEM);
2287         }
2288         xprt = &new->xprt;
2289
2290         xprt->max_reqs = slot_table_size;
2291         xprt->slot = kcalloc(xprt->max_reqs, sizeof(struct rpc_rqst), GFP_KERNEL);
2292         if (xprt->slot == NULL) {
2293                 kfree(xprt);
2294                 dprintk("RPC:       xs_setup_xprt: couldn't allocate slot "
2295                                 "table\n");
2296                 return ERR_PTR(-ENOMEM);
2297         }
2298
2299         memcpy(&xprt->addr, args->dstaddr, args->addrlen);
2300         xprt->addrlen = args->addrlen;
2301         if (args->srcaddr)
2302                 memcpy(&new->srcaddr, args->srcaddr, args->addrlen);
2303
2304         return xprt;
2305 }
2306
2307 static const struct rpc_timeout xs_udp_default_timeout = {
2308         .to_initval = 5 * HZ,
2309         .to_maxval = 30 * HZ,
2310         .to_increment = 5 * HZ,
2311         .to_retries = 5,
2312 };
2313
2314 /**
2315  * xs_setup_udp - Set up transport to use a UDP socket
2316  * @args: rpc transport creation arguments
2317  *
2318  */
2319 static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2320 {
2321         struct sockaddr *addr = args->dstaddr;
2322         struct rpc_xprt *xprt;
2323         struct sock_xprt *transport;
2324
2325         xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries);
2326         if (IS_ERR(xprt))
2327                 return xprt;
2328         transport = container_of(xprt, struct sock_xprt, xprt);
2329
2330         xprt->prot = IPPROTO_UDP;
2331         xprt->tsh_size = 0;
2332         /* XXX: header size can vary due to auth type, IPv6, etc. */
2333         xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
2334
2335         xprt->bind_timeout = XS_BIND_TO;
2336         xprt->connect_timeout = XS_UDP_CONN_TO;
2337         xprt->reestablish_timeout = XS_UDP_REEST_TO;
2338         xprt->idle_timeout = XS_IDLE_DISC_TO;
2339
2340         xprt->ops = &xs_udp_ops;
2341
2342         xprt->timeout = &xs_udp_default_timeout;
2343
2344         switch (addr->sa_family) {
2345         case AF_INET:
2346                 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2347                         xprt_set_bound(xprt);
2348
2349                 INIT_DELAYED_WORK(&transport->connect_worker,
2350                                         xs_udp_connect_worker4);
2351                 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2352                 break;
2353         case AF_INET6:
2354                 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2355                         xprt_set_bound(xprt);
2356
2357                 INIT_DELAYED_WORK(&transport->connect_worker,
2358                                         xs_udp_connect_worker6);
2359                 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2360                 break;
2361         default:
2362                 kfree(xprt);
2363                 return ERR_PTR(-EAFNOSUPPORT);
2364         }
2365
2366         if (xprt_bound(xprt))
2367                 dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2368                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2369                                 xprt->address_strings[RPC_DISPLAY_PORT],
2370                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2371         else
2372                 dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2373                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2374                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2375
2376         if (try_module_get(THIS_MODULE))
2377                 return xprt;
2378
2379         kfree(xprt->slot);
2380         kfree(xprt);
2381         return ERR_PTR(-EINVAL);
2382 }
2383
2384 static const struct rpc_timeout xs_tcp_default_timeout = {
2385         .to_initval = 60 * HZ,
2386         .to_maxval = 60 * HZ,
2387         .to_retries = 2,
2388 };
2389
2390 /**
2391  * xs_setup_tcp - Set up transport to use a TCP socket
2392  * @args: rpc transport creation arguments
2393  *
2394  */
2395 static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2396 {
2397         struct sockaddr *addr = args->dstaddr;
2398         struct rpc_xprt *xprt;
2399         struct sock_xprt *transport;
2400
2401         xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
2402         if (IS_ERR(xprt))
2403                 return xprt;
2404         transport = container_of(xprt, struct sock_xprt, xprt);
2405
2406         xprt->prot = IPPROTO_TCP;
2407         xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2408         xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2409
2410         xprt->bind_timeout = XS_BIND_TO;
2411         xprt->connect_timeout = XS_TCP_CONN_TO;
2412         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2413         xprt->idle_timeout = XS_IDLE_DISC_TO;
2414
2415         xprt->ops = &xs_tcp_ops;
2416         xprt->timeout = &xs_tcp_default_timeout;
2417
2418         switch (addr->sa_family) {
2419         case AF_INET:
2420                 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2421                         xprt_set_bound(xprt);
2422
2423                 INIT_DELAYED_WORK(&transport->connect_worker,
2424                                         xs_tcp_connect_worker4);
2425                 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2426                 break;
2427         case AF_INET6:
2428                 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2429                         xprt_set_bound(xprt);
2430
2431                 INIT_DELAYED_WORK(&transport->connect_worker,
2432                                         xs_tcp_connect_worker6);
2433                 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2434                 break;
2435         default:
2436                 kfree(xprt);
2437                 return ERR_PTR(-EAFNOSUPPORT);
2438         }
2439
2440         if (xprt_bound(xprt))
2441                 dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2442                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2443                                 xprt->address_strings[RPC_DISPLAY_PORT],
2444                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2445         else
2446                 dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2447                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2448                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2449
2450
2451         if (try_module_get(THIS_MODULE))
2452                 return xprt;
2453
2454         kfree(xprt->slot);
2455         kfree(xprt);
2456         return ERR_PTR(-EINVAL);
2457 }
2458
2459 /**
2460  * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket
2461  * @args: rpc transport creation arguments
2462  *
2463  */
2464 static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
2465 {
2466         struct sockaddr *addr = args->dstaddr;
2467         struct rpc_xprt *xprt;
2468         struct sock_xprt *transport;
2469         struct svc_sock *bc_sock;
2470
2471         if (!args->bc_xprt)
2472                 ERR_PTR(-EINVAL);
2473
2474         xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
2475         if (IS_ERR(xprt))
2476                 return xprt;
2477         transport = container_of(xprt, struct sock_xprt, xprt);
2478
2479         xprt->prot = IPPROTO_TCP;
2480         xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2481         xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2482         xprt->timeout = &xs_tcp_default_timeout;
2483
2484         /* backchannel */
2485         xprt_set_bound(xprt);
2486         xprt->bind_timeout = 0;
2487         xprt->connect_timeout = 0;
2488         xprt->reestablish_timeout = 0;
2489         xprt->idle_timeout = 0;
2490
2491         /*
2492          * The backchannel uses the same socket connection as the
2493          * forechannel
2494          */
2495         xprt->bc_xprt = args->bc_xprt;
2496         bc_sock = container_of(args->bc_xprt, struct svc_sock, sk_xprt);
2497         bc_sock->sk_bc_xprt = xprt;
2498         transport->sock = bc_sock->sk_sock;
2499         transport->inet = bc_sock->sk_sk;
2500
2501         xprt->ops = &bc_tcp_ops;
2502
2503         switch (addr->sa_family) {
2504         case AF_INET:
2505                 xs_format_peer_addresses(xprt, "tcp",
2506                                          RPCBIND_NETID_TCP);
2507                 break;
2508         case AF_INET6:
2509                 xs_format_peer_addresses(xprt, "tcp",
2510                                    RPCBIND_NETID_TCP6);
2511                 break;
2512         default:
2513                 kfree(xprt);
2514                 return ERR_PTR(-EAFNOSUPPORT);
2515         }
2516
2517         if (xprt_bound(xprt))
2518                 dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2519                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2520                                 xprt->address_strings[RPC_DISPLAY_PORT],
2521                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2522         else
2523                 dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2524                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2525                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2526
2527         /*
2528          * Since we don't want connections for the backchannel, we set
2529          * the xprt status to connected
2530          */
2531         xprt_set_connected(xprt);
2532
2533
2534         if (try_module_get(THIS_MODULE))
2535                 return xprt;
2536         kfree(xprt->slot);
2537         kfree(xprt);
2538         return ERR_PTR(-EINVAL);
2539 }
2540
2541 static struct xprt_class        xs_udp_transport = {
2542         .list           = LIST_HEAD_INIT(xs_udp_transport.list),
2543         .name           = "udp",
2544         .owner          = THIS_MODULE,
2545         .ident          = XPRT_TRANSPORT_UDP,
2546         .setup          = xs_setup_udp,
2547 };
2548
2549 static struct xprt_class        xs_tcp_transport = {
2550         .list           = LIST_HEAD_INIT(xs_tcp_transport.list),
2551         .name           = "tcp",
2552         .owner          = THIS_MODULE,
2553         .ident          = XPRT_TRANSPORT_TCP,
2554         .setup          = xs_setup_tcp,
2555 };
2556
2557 static struct xprt_class        xs_bc_tcp_transport = {
2558         .list           = LIST_HEAD_INIT(xs_bc_tcp_transport.list),
2559         .name           = "tcp NFSv4.1 backchannel",
2560         .owner          = THIS_MODULE,
2561         .ident          = XPRT_TRANSPORT_BC_TCP,
2562         .setup          = xs_setup_bc_tcp,
2563 };
2564
2565 /**
2566  * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
2567  *
2568  */
2569 int init_socket_xprt(void)
2570 {
2571 #ifdef RPC_DEBUG
2572         if (!sunrpc_table_header)
2573                 sunrpc_table_header = register_sysctl_table(sunrpc_table);
2574 #endif
2575
2576         xprt_register_transport(&xs_udp_transport);
2577         xprt_register_transport(&xs_tcp_transport);
2578         xprt_register_transport(&xs_bc_tcp_transport);
2579
2580         return 0;
2581 }
2582
2583 /**
2584  * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
2585  *
2586  */
2587 void cleanup_socket_xprt(void)
2588 {
2589 #ifdef RPC_DEBUG
2590         if (sunrpc_table_header) {
2591                 unregister_sysctl_table(sunrpc_table_header);
2592                 sunrpc_table_header = NULL;
2593         }
2594 #endif
2595
2596         xprt_unregister_transport(&xs_udp_transport);
2597         xprt_unregister_transport(&xs_tcp_transport);
2598         xprt_unregister_transport(&xs_bc_tcp_transport);
2599 }
2600
2601 static int param_set_uint_minmax(const char *val, struct kernel_param *kp,
2602                 unsigned int min, unsigned int max)
2603 {
2604         unsigned long num;
2605         int ret;
2606
2607         if (!val)
2608                 return -EINVAL;
2609         ret = strict_strtoul(val, 0, &num);
2610         if (ret == -EINVAL || num < min || num > max)
2611                 return -EINVAL;
2612         *((unsigned int *)kp->arg) = num;
2613         return 0;
2614 }
2615
2616 static int param_set_portnr(const char *val, struct kernel_param *kp)
2617 {
2618         return param_set_uint_minmax(val, kp,
2619                         RPC_MIN_RESVPORT,
2620                         RPC_MAX_RESVPORT);
2621 }
2622
2623 static int param_get_portnr(char *buffer, struct kernel_param *kp)
2624 {
2625         return param_get_uint(buffer, kp);
2626 }
2627 #define param_check_portnr(name, p) \
2628         __param_check(name, p, unsigned int);
2629
2630 module_param_named(min_resvport, xprt_min_resvport, portnr, 0644);
2631 module_param_named(max_resvport, xprt_max_resvport, portnr, 0644);
2632
2633 static int param_set_slot_table_size(const char *val, struct kernel_param *kp)
2634 {
2635         return param_set_uint_minmax(val, kp,
2636                         RPC_MIN_SLOT_TABLE,
2637                         RPC_MAX_SLOT_TABLE);
2638 }
2639
2640 static int param_get_slot_table_size(char *buffer, struct kernel_param *kp)
2641 {
2642         return param_get_uint(buffer, kp);
2643 }
2644 #define param_check_slot_table_size(name, p) \
2645         __param_check(name, p, unsigned int);
2646
2647 module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries,
2648                    slot_table_size, 0644);
2649 module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries,
2650                    slot_table_size, 0644);
2651