]> bbs.cooldavid.org Git - net-next-2.6.git/blob - fs/ceph/messenger.c
ceph: messenger library
[net-next-2.6.git] / fs / ceph / messenger.c
1 #include "ceph_debug.h"
2
3 #include <linux/crc32c.h>
4 #include <linux/ctype.h>
5 #include <linux/highmem.h>
6 #include <linux/inet.h>
7 #include <linux/kthread.h>
8 #include <linux/net.h>
9 #include <linux/socket.h>
10 #include <linux/string.h>
11 #include <net/tcp.h>
12
13 #include "super.h"
14 #include "messenger.h"
15
16 /*
17  * Ceph uses the messenger to exchange ceph_msg messages with other
18  * hosts in the system.  The messenger provides ordered and reliable
19  * delivery.  We tolerate TCP disconnects by reconnecting (with
20  * exponential backoff) in the case of a fault (disconnection, bad
21  * crc, protocol error).  Acks allow sent messages to be discarded by
22  * the sender.
23  */
24
25 /* static tag bytes (protocol control messages) */
26 static char tag_msg = CEPH_MSGR_TAG_MSG;
27 static char tag_ack = CEPH_MSGR_TAG_ACK;
28 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
29
30
31 static void queue_con(struct ceph_connection *con);
32 static void con_work(struct work_struct *);
33 static void ceph_fault(struct ceph_connection *con);
34
35 const char *ceph_name_type_str(int t)
36 {
37         switch (t) {
38         case CEPH_ENTITY_TYPE_MON: return "mon";
39         case CEPH_ENTITY_TYPE_MDS: return "mds";
40         case CEPH_ENTITY_TYPE_OSD: return "osd";
41         case CEPH_ENTITY_TYPE_CLIENT: return "client";
42         case CEPH_ENTITY_TYPE_ADMIN: return "admin";
43         default: return "???";
44         }
45 }
46
47 /*
48  * nicely render a sockaddr as a string.
49  */
50 #define MAX_ADDR_STR 20
51 static char addr_str[MAX_ADDR_STR][40];
52 static DEFINE_SPINLOCK(addr_str_lock);
53 static int last_addr_str;
54
55 const char *pr_addr(const struct sockaddr_storage *ss)
56 {
57         int i;
58         char *s;
59         struct sockaddr_in *in4 = (void *)ss;
60         unsigned char *quad = (void *)&in4->sin_addr.s_addr;
61         struct sockaddr_in6 *in6 = (void *)ss;
62
63         spin_lock(&addr_str_lock);
64         i = last_addr_str++;
65         if (last_addr_str == MAX_ADDR_STR)
66                 last_addr_str = 0;
67         spin_unlock(&addr_str_lock);
68         s = addr_str[i];
69
70         switch (ss->ss_family) {
71         case AF_INET:
72                 sprintf(s, "%u.%u.%u.%u:%u",
73                         (unsigned int)quad[0],
74                         (unsigned int)quad[1],
75                         (unsigned int)quad[2],
76                         (unsigned int)quad[3],
77                         (unsigned int)ntohs(in4->sin_port));
78                 break;
79
80         case AF_INET6:
81                 sprintf(s, "%04x:%04x:%04x:%04x:%04x:%04x:%04x:%04x:%u",
82                         in6->sin6_addr.s6_addr16[0],
83                         in6->sin6_addr.s6_addr16[1],
84                         in6->sin6_addr.s6_addr16[2],
85                         in6->sin6_addr.s6_addr16[3],
86                         in6->sin6_addr.s6_addr16[4],
87                         in6->sin6_addr.s6_addr16[5],
88                         in6->sin6_addr.s6_addr16[6],
89                         in6->sin6_addr.s6_addr16[7],
90                         (unsigned int)ntohs(in6->sin6_port));
91                 break;
92
93         default:
94                 sprintf(s, "(unknown sockaddr family %d)", (int)ss->ss_family);
95         }
96
97         return s;
98 }
99
100 /*
101  * work queue for all reading and writing to/from the socket.
102  */
103 struct workqueue_struct *ceph_msgr_wq;
104
105 int __init ceph_msgr_init(void)
106 {
107         ceph_msgr_wq = create_workqueue("ceph-msgr");
108         if (IS_ERR(ceph_msgr_wq)) {
109                 int ret = PTR_ERR(ceph_msgr_wq);
110                 pr_err("msgr_init failed to create workqueue: %d\n", ret);
111                 ceph_msgr_wq = NULL;
112                 return ret;
113         }
114         return 0;
115 }
116
117 void ceph_msgr_exit(void)
118 {
119         destroy_workqueue(ceph_msgr_wq);
120 }
121
122 /*
123  * socket callback functions
124  */
125
126 /* data available on socket, or listen socket received a connect */
127 static void ceph_data_ready(struct sock *sk, int count_unused)
128 {
129         struct ceph_connection *con =
130                 (struct ceph_connection *)sk->sk_user_data;
131         if (sk->sk_state != TCP_CLOSE_WAIT) {
132                 dout("ceph_data_ready on %p state = %lu, queueing work\n",
133                      con, con->state);
134                 queue_con(con);
135         }
136 }
137
138 /* socket has buffer space for writing */
139 static void ceph_write_space(struct sock *sk)
140 {
141         struct ceph_connection *con =
142                 (struct ceph_connection *)sk->sk_user_data;
143
144         /* only queue to workqueue if there is data we want to write. */
145         if (test_bit(WRITE_PENDING, &con->state)) {
146                 dout("ceph_write_space %p queueing write work\n", con);
147                 queue_con(con);
148         } else {
149                 dout("ceph_write_space %p nothing to write\n", con);
150         }
151
152         /* since we have our own write_space, clear the SOCK_NOSPACE flag */
153         clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
154 }
155
156 /* socket's state has changed */
157 static void ceph_state_change(struct sock *sk)
158 {
159         struct ceph_connection *con =
160                 (struct ceph_connection *)sk->sk_user_data;
161
162         dout("ceph_state_change %p state = %lu sk_state = %u\n",
163              con, con->state, sk->sk_state);
164
165         if (test_bit(CLOSED, &con->state))
166                 return;
167
168         switch (sk->sk_state) {
169         case TCP_CLOSE:
170                 dout("ceph_state_change TCP_CLOSE\n");
171         case TCP_CLOSE_WAIT:
172                 dout("ceph_state_change TCP_CLOSE_WAIT\n");
173                 if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) {
174                         if (test_bit(CONNECTING, &con->state))
175                                 con->error_msg = "connection failed";
176                         else
177                                 con->error_msg = "socket closed";
178                         queue_con(con);
179                 }
180                 break;
181         case TCP_ESTABLISHED:
182                 dout("ceph_state_change TCP_ESTABLISHED\n");
183                 queue_con(con);
184                 break;
185         }
186 }
187
188 /*
189  * set up socket callbacks
190  */
191 static void set_sock_callbacks(struct socket *sock,
192                                struct ceph_connection *con)
193 {
194         struct sock *sk = sock->sk;
195         sk->sk_user_data = (void *)con;
196         sk->sk_data_ready = ceph_data_ready;
197         sk->sk_write_space = ceph_write_space;
198         sk->sk_state_change = ceph_state_change;
199 }
200
201
202 /*
203  * socket helpers
204  */
205
206 /*
207  * initiate connection to a remote socket.
208  */
209 static struct socket *ceph_tcp_connect(struct ceph_connection *con)
210 {
211         struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.in_addr;
212         struct socket *sock;
213         int ret;
214
215         BUG_ON(con->sock);
216         ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
217         if (ret)
218                 return ERR_PTR(ret);
219         con->sock = sock;
220         sock->sk->sk_allocation = GFP_NOFS;
221
222         set_sock_callbacks(sock, con);
223
224         dout("connect %s\n", pr_addr(&con->peer_addr.in_addr));
225
226         ret = sock->ops->connect(sock, paddr, sizeof(*paddr), O_NONBLOCK);
227         if (ret == -EINPROGRESS) {
228                 dout("connect %s EINPROGRESS sk_state = %u\n",
229                      pr_addr(&con->peer_addr.in_addr),
230                      sock->sk->sk_state);
231                 ret = 0;
232         }
233         if (ret < 0) {
234                 pr_err("connect %s error %d\n",
235                        pr_addr(&con->peer_addr.in_addr), ret);
236                 sock_release(sock);
237                 con->sock = NULL;
238                 con->error_msg = "connect error";
239         }
240
241         if (ret < 0)
242                 return ERR_PTR(ret);
243         return sock;
244 }
245
246 static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
247 {
248         struct kvec iov = {buf, len};
249         struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
250
251         return kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
252 }
253
254 /*
255  * write something.  @more is true if caller will be sending more data
256  * shortly.
257  */
258 static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
259                      size_t kvlen, size_t len, int more)
260 {
261         struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
262
263         if (more)
264                 msg.msg_flags |= MSG_MORE;
265         else
266                 msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
267
268         return kernel_sendmsg(sock, &msg, iov, kvlen, len);
269 }
270
271
272 /*
273  * Shutdown/close the socket for the given connection.
274  */
275 static int con_close_socket(struct ceph_connection *con)
276 {
277         int rc;
278
279         dout("con_close_socket on %p sock %p\n", con, con->sock);
280         if (!con->sock)
281                 return 0;
282         set_bit(SOCK_CLOSED, &con->state);
283         rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
284         sock_release(con->sock);
285         con->sock = NULL;
286         clear_bit(SOCK_CLOSED, &con->state);
287         return rc;
288 }
289
290 /*
291  * Reset a connection.  Discard all incoming and outgoing messages
292  * and clear *_seq state.
293  */
294 static void ceph_msg_remove(struct ceph_msg *msg)
295 {
296         list_del_init(&msg->list_head);
297         ceph_msg_put(msg);
298 }
299 static void ceph_msg_remove_list(struct list_head *head)
300 {
301         while (!list_empty(head)) {
302                 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
303                                                         list_head);
304                 ceph_msg_remove(msg);
305         }
306 }
307
308 static void reset_connection(struct ceph_connection *con)
309 {
310         /* reset connection, out_queue, msg_ and connect_seq */
311         /* discard existing out_queue and msg_seq */
312         mutex_lock(&con->out_mutex);
313         ceph_msg_remove_list(&con->out_queue);
314         ceph_msg_remove_list(&con->out_sent);
315
316         con->connect_seq = 0;
317         con->out_seq = 0;
318         con->out_msg = NULL;
319         con->in_seq = 0;
320         mutex_unlock(&con->out_mutex);
321 }
322
323 /*
324  * mark a peer down.  drop any open connections.
325  */
326 void ceph_con_close(struct ceph_connection *con)
327 {
328         dout("con_close %p peer %s\n", con, pr_addr(&con->peer_addr.in_addr));
329         set_bit(CLOSED, &con->state);  /* in case there's queued work */
330         clear_bit(STANDBY, &con->state);  /* avoid connect_seq bump */
331         reset_connection(con);
332         queue_con(con);
333 }
334
335 /*
336  * clean up connection state
337  */
338 void ceph_con_shutdown(struct ceph_connection *con)
339 {
340         dout("con_shutdown %p\n", con);
341         reset_connection(con);
342         set_bit(DEAD, &con->state);
343         con_close_socket(con); /* silently ignore errors */
344 }
345
346 /*
347  * Reopen a closed connection, with a new peer address.
348  */
349 void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr)
350 {
351         dout("con_open %p %s\n", con, pr_addr(&addr->in_addr));
352         set_bit(OPENING, &con->state);
353         clear_bit(CLOSED, &con->state);
354         memcpy(&con->peer_addr, addr, sizeof(*addr));
355         queue_con(con);
356 }
357
358 /*
359  * generic get/put
360  */
361 struct ceph_connection *ceph_con_get(struct ceph_connection *con)
362 {
363         dout("con_get %p nref = %d -> %d\n", con,
364              atomic_read(&con->nref), atomic_read(&con->nref) + 1);
365         if (atomic_inc_not_zero(&con->nref))
366                 return con;
367         return NULL;
368 }
369
370 void ceph_con_put(struct ceph_connection *con)
371 {
372         dout("con_put %p nref = %d -> %d\n", con,
373              atomic_read(&con->nref), atomic_read(&con->nref) - 1);
374         BUG_ON(atomic_read(&con->nref) == 0);
375         if (atomic_dec_and_test(&con->nref)) {
376                 ceph_con_shutdown(con);
377                 kfree(con);
378         }
379 }
380
381 /*
382  * initialize a new connection.
383  */
384 void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con)
385 {
386         dout("con_init %p\n", con);
387         memset(con, 0, sizeof(*con));
388         atomic_set(&con->nref, 1);
389         con->msgr = msgr;
390         mutex_init(&con->out_mutex);
391         INIT_LIST_HEAD(&con->out_queue);
392         INIT_LIST_HEAD(&con->out_sent);
393         INIT_DELAYED_WORK(&con->work, con_work);
394 }
395
396
397 /*
398  * We maintain a global counter to order connection attempts.  Get
399  * a unique seq greater than @gt.
400  */
401 static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
402 {
403         u32 ret;
404
405         spin_lock(&msgr->global_seq_lock);
406         if (msgr->global_seq < gt)
407                 msgr->global_seq = gt;
408         ret = ++msgr->global_seq;
409         spin_unlock(&msgr->global_seq_lock);
410         return ret;
411 }
412
413
414 /*
415  * Prepare footer for currently outgoing message, and finish things
416  * off.  Assumes out_kvec* are already valid.. we just add on to the end.
417  */
418 static void prepare_write_message_footer(struct ceph_connection *con, int v)
419 {
420         struct ceph_msg *m = con->out_msg;
421
422         dout("prepare_write_message_footer %p\n", con);
423         con->out_kvec_is_msg = true;
424         con->out_kvec[v].iov_base = &m->footer;
425         con->out_kvec[v].iov_len = sizeof(m->footer);
426         con->out_kvec_bytes += sizeof(m->footer);
427         con->out_kvec_left++;
428         con->out_more = m->more_to_follow;
429         con->out_msg = NULL;   /* we're done with this one */
430 }
431
432 /*
433  * Prepare headers for the next outgoing message.
434  */
435 static void prepare_write_message(struct ceph_connection *con)
436 {
437         struct ceph_msg *m;
438         int v = 0;
439
440         con->out_kvec_bytes = 0;
441         con->out_kvec_is_msg = true;
442
443         /* Sneak an ack in there first?  If we can get it into the same
444          * TCP packet that's a good thing. */
445         if (con->in_seq > con->in_seq_acked) {
446                 con->in_seq_acked = con->in_seq;
447                 con->out_kvec[v].iov_base = &tag_ack;
448                 con->out_kvec[v++].iov_len = 1;
449                 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
450                 con->out_kvec[v].iov_base = &con->out_temp_ack;
451                 con->out_kvec[v++].iov_len = sizeof(con->out_temp_ack);
452                 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
453         }
454
455         /* move message to sending/sent list */
456         m = list_first_entry(&con->out_queue,
457                        struct ceph_msg, list_head);
458         list_move_tail(&m->list_head, &con->out_sent);
459         con->out_msg = m;   /* we don't bother taking a reference here. */
460
461         m->hdr.seq = cpu_to_le64(++con->out_seq);
462
463         dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
464              m, con->out_seq, le16_to_cpu(m->hdr.type),
465              le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
466              le32_to_cpu(m->hdr.data_len),
467              m->nr_pages);
468         BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
469
470         /* tag + hdr + front + middle */
471         con->out_kvec[v].iov_base = &tag_msg;
472         con->out_kvec[v++].iov_len = 1;
473         con->out_kvec[v].iov_base = &m->hdr;
474         con->out_kvec[v++].iov_len = sizeof(m->hdr);
475         con->out_kvec[v++] = m->front;
476         if (m->middle)
477                 con->out_kvec[v++] = m->middle->vec;
478         con->out_kvec_left = v;
479         con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len +
480                 (m->middle ? m->middle->vec.iov_len : 0);
481         con->out_kvec_cur = con->out_kvec;
482
483         /* fill in crc (except data pages), footer */
484         con->out_msg->hdr.crc =
485                 cpu_to_le32(crc32c(0, (void *)&m->hdr,
486                                       sizeof(m->hdr) - sizeof(m->hdr.crc)));
487         con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE;
488         con->out_msg->footer.front_crc =
489                 cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len));
490         if (m->middle)
491                 con->out_msg->footer.middle_crc =
492                         cpu_to_le32(crc32c(0, m->middle->vec.iov_base,
493                                            m->middle->vec.iov_len));
494         else
495                 con->out_msg->footer.middle_crc = 0;
496         con->out_msg->footer.data_crc = 0;
497         dout("prepare_write_message front_crc %u data_crc %u\n",
498              le32_to_cpu(con->out_msg->footer.front_crc),
499              le32_to_cpu(con->out_msg->footer.middle_crc));
500
501         /* is there a data payload? */
502         if (le32_to_cpu(m->hdr.data_len) > 0) {
503                 /* initialize page iterator */
504                 con->out_msg_pos.page = 0;
505                 con->out_msg_pos.page_pos =
506                         le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
507                 con->out_msg_pos.data_pos = 0;
508                 con->out_msg_pos.did_page_crc = 0;
509                 con->out_more = 1;  /* data + footer will follow */
510         } else {
511                 /* no, queue up footer too and be done */
512                 prepare_write_message_footer(con, v);
513         }
514
515         set_bit(WRITE_PENDING, &con->state);
516 }
517
518 /*
519  * Prepare an ack.
520  */
521 static void prepare_write_ack(struct ceph_connection *con)
522 {
523         dout("prepare_write_ack %p %llu -> %llu\n", con,
524              con->in_seq_acked, con->in_seq);
525         con->in_seq_acked = con->in_seq;
526
527         con->out_kvec[0].iov_base = &tag_ack;
528         con->out_kvec[0].iov_len = 1;
529         con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
530         con->out_kvec[1].iov_base = &con->out_temp_ack;
531         con->out_kvec[1].iov_len = sizeof(con->out_temp_ack);
532         con->out_kvec_left = 2;
533         con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
534         con->out_kvec_cur = con->out_kvec;
535         con->out_more = 1;  /* more will follow.. eventually.. */
536         set_bit(WRITE_PENDING, &con->state);
537 }
538
539 /*
540  * Prepare to write keepalive byte.
541  */
542 static void prepare_write_keepalive(struct ceph_connection *con)
543 {
544         dout("prepare_write_keepalive %p\n", con);
545         con->out_kvec[0].iov_base = &tag_keepalive;
546         con->out_kvec[0].iov_len = 1;
547         con->out_kvec_left = 1;
548         con->out_kvec_bytes = 1;
549         con->out_kvec_cur = con->out_kvec;
550         set_bit(WRITE_PENDING, &con->state);
551 }
552
553 /*
554  * Connection negotiation.
555  */
556
557 /*
558  * We connected to a peer and are saying hello.
559  */
560 static void prepare_write_connect(struct ceph_messenger *msgr,
561                                   struct ceph_connection *con)
562 {
563         int len = strlen(CEPH_BANNER);
564         unsigned global_seq = get_global_seq(con->msgr, 0);
565         int proto;
566
567         switch (con->peer_name.type) {
568         case CEPH_ENTITY_TYPE_MON:
569                 proto = CEPH_MONC_PROTOCOL;
570                 break;
571         case CEPH_ENTITY_TYPE_OSD:
572                 proto = CEPH_OSDC_PROTOCOL;
573                 break;
574         case CEPH_ENTITY_TYPE_MDS:
575                 proto = CEPH_MDSC_PROTOCOL;
576                 break;
577         default:
578                 BUG();
579         }
580
581         dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
582              con->connect_seq, global_seq, proto);
583         con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
584         con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
585         con->out_connect.global_seq = cpu_to_le32(global_seq);
586         con->out_connect.protocol_version = cpu_to_le32(proto);
587         con->out_connect.flags = 0;
588         if (test_bit(LOSSYTX, &con->state))
589                 con->out_connect.flags = CEPH_MSG_CONNECT_LOSSY;
590
591         con->out_kvec[0].iov_base = CEPH_BANNER;
592         con->out_kvec[0].iov_len = len;
593         con->out_kvec[1].iov_base = &msgr->inst.addr;
594         con->out_kvec[1].iov_len = sizeof(msgr->inst.addr);
595         con->out_kvec[2].iov_base = &con->out_connect;
596         con->out_kvec[2].iov_len = sizeof(con->out_connect);
597         con->out_kvec_left = 3;
598         con->out_kvec_bytes = len + sizeof(msgr->inst.addr) +
599                 sizeof(con->out_connect);
600         con->out_kvec_cur = con->out_kvec;
601         con->out_more = 0;
602         set_bit(WRITE_PENDING, &con->state);
603 }
604
605 static void prepare_write_connect_retry(struct ceph_messenger *msgr,
606                                         struct ceph_connection *con)
607 {
608         dout("prepare_write_connect_retry %p\n", con);
609         con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
610         con->out_connect.global_seq =
611                 cpu_to_le32(get_global_seq(con->msgr, 0));
612
613         con->out_kvec[0].iov_base = &con->out_connect;
614         con->out_kvec[0].iov_len = sizeof(con->out_connect);
615         con->out_kvec_left = 1;
616         con->out_kvec_bytes = sizeof(con->out_connect);
617         con->out_kvec_cur = con->out_kvec;
618         con->out_more = 0;
619         set_bit(WRITE_PENDING, &con->state);
620 }
621
622
623 /*
624  * write as much of pending kvecs to the socket as we can.
625  *  1 -> done
626  *  0 -> socket full, but more to do
627  * <0 -> error
628  */
629 static int write_partial_kvec(struct ceph_connection *con)
630 {
631         int ret;
632
633         dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes);
634         while (con->out_kvec_bytes > 0) {
635                 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
636                                        con->out_kvec_left, con->out_kvec_bytes,
637                                        con->out_more);
638                 if (ret <= 0)
639                         goto out;
640                 con->out_kvec_bytes -= ret;
641                 if (con->out_kvec_bytes == 0)
642                         break;            /* done */
643                 while (ret > 0) {
644                         if (ret >= con->out_kvec_cur->iov_len) {
645                                 ret -= con->out_kvec_cur->iov_len;
646                                 con->out_kvec_cur++;
647                                 con->out_kvec_left--;
648                         } else {
649                                 con->out_kvec_cur->iov_len -= ret;
650                                 con->out_kvec_cur->iov_base += ret;
651                                 ret = 0;
652                                 break;
653                         }
654                 }
655         }
656         con->out_kvec_left = 0;
657         con->out_kvec_is_msg = false;
658         ret = 1;
659 out:
660         dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
661              con->out_kvec_bytes, con->out_kvec_left, ret);
662         return ret;  /* done! */
663 }
664
665 /*
666  * Write as much message data payload as we can.  If we finish, queue
667  * up the footer.
668  *  1 -> done, footer is now queued in out_kvec[].
669  *  0 -> socket full, but more to do
670  * <0 -> error
671  */
672 static int write_partial_msg_pages(struct ceph_connection *con)
673 {
674         struct ceph_msg *msg = con->out_msg;
675         unsigned data_len = le32_to_cpu(msg->hdr.data_len);
676         size_t len;
677         int crc = con->msgr->nocrc;
678         int ret;
679
680         dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
681              con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
682              con->out_msg_pos.page_pos);
683
684         while (con->out_msg_pos.page < con->out_msg->nr_pages) {
685                 struct page *page = NULL;
686                 void *kaddr = NULL;
687
688                 /*
689                  * if we are calculating the data crc (the default), we need
690                  * to map the page.  if our pages[] has been revoked, use the
691                  * zero page.
692                  */
693                 if (msg->pages) {
694                         page = msg->pages[con->out_msg_pos.page];
695                         if (crc)
696                                 kaddr = kmap(page);
697                 } else {
698                         page = con->msgr->zero_page;
699                         if (crc)
700                                 kaddr = page_address(con->msgr->zero_page);
701                 }
702                 len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
703                           (int)(data_len - con->out_msg_pos.data_pos));
704                 if (crc && !con->out_msg_pos.did_page_crc) {
705                         void *base = kaddr + con->out_msg_pos.page_pos;
706                         u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
707
708                         BUG_ON(kaddr == NULL);
709                         con->out_msg->footer.data_crc =
710                                 cpu_to_le32(crc32c(tmpcrc, base, len));
711                         con->out_msg_pos.did_page_crc = 1;
712                 }
713
714                 ret = kernel_sendpage(con->sock, page,
715                                       con->out_msg_pos.page_pos, len,
716                                       MSG_DONTWAIT | MSG_NOSIGNAL |
717                                       MSG_MORE);
718
719                 if (crc && msg->pages)
720                         kunmap(page);
721
722                 if (ret <= 0)
723                         goto out;
724
725                 con->out_msg_pos.data_pos += ret;
726                 con->out_msg_pos.page_pos += ret;
727                 if (ret == len) {
728                         con->out_msg_pos.page_pos = 0;
729                         con->out_msg_pos.page++;
730                         con->out_msg_pos.did_page_crc = 0;
731                 }
732         }
733
734         dout("write_partial_msg_pages %p msg %p done\n", con, msg);
735
736         /* prepare and queue up footer, too */
737         if (!crc)
738                 con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
739         con->out_kvec_bytes = 0;
740         con->out_kvec_left = 0;
741         con->out_kvec_cur = con->out_kvec;
742         prepare_write_message_footer(con, 0);
743         ret = 1;
744 out:
745         return ret;
746 }
747
748 /*
749  * write some zeros
750  */
751 static int write_partial_skip(struct ceph_connection *con)
752 {
753         int ret;
754
755         while (con->out_skip > 0) {
756                 struct kvec iov = {
757                         .iov_base = page_address(con->msgr->zero_page),
758                         .iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE)
759                 };
760
761                 ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1);
762                 if (ret <= 0)
763                         goto out;
764                 con->out_skip -= ret;
765         }
766         ret = 1;
767 out:
768         return ret;
769 }
770
771 /*
772  * Prepare to read connection handshake, or an ack.
773  */
774 static void prepare_read_connect(struct ceph_connection *con)
775 {
776         dout("prepare_read_connect %p\n", con);
777         con->in_base_pos = 0;
778 }
779
780 static void prepare_read_ack(struct ceph_connection *con)
781 {
782         dout("prepare_read_ack %p\n", con);
783         con->in_base_pos = 0;
784 }
785
786 static void prepare_read_tag(struct ceph_connection *con)
787 {
788         dout("prepare_read_tag %p\n", con);
789         con->in_base_pos = 0;
790         con->in_tag = CEPH_MSGR_TAG_READY;
791 }
792
793 /*
794  * Prepare to read a message.
795  */
796 static int prepare_read_message(struct ceph_connection *con)
797 {
798         dout("prepare_read_message %p\n", con);
799         BUG_ON(con->in_msg != NULL);
800         con->in_base_pos = 0;
801         con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
802         return 0;
803 }
804
805
806 static int read_partial(struct ceph_connection *con,
807                         int *to, int size, void *object)
808 {
809         *to += size;
810         while (con->in_base_pos < *to) {
811                 int left = *to - con->in_base_pos;
812                 int have = size - left;
813                 int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
814                 if (ret <= 0)
815                         return ret;
816                 con->in_base_pos += ret;
817         }
818         return 1;
819 }
820
821
822 /*
823  * Read all or part of the connect-side handshake on a new connection
824  */
825 static int read_partial_connect(struct ceph_connection *con)
826 {
827         int ret, to = 0;
828
829         dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
830
831         /* peer's banner */
832         ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner);
833         if (ret <= 0)
834                 goto out;
835         ret = read_partial(con, &to, sizeof(con->actual_peer_addr),
836                            &con->actual_peer_addr);
837         if (ret <= 0)
838                 goto out;
839         ret = read_partial(con, &to, sizeof(con->peer_addr_for_me),
840                            &con->peer_addr_for_me);
841         if (ret <= 0)
842                 goto out;
843         ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply);
844         if (ret <= 0)
845                 goto out;
846
847         dout("read_partial_connect %p connect_seq = %u, global_seq = %u\n",
848              con, le32_to_cpu(con->in_reply.connect_seq),
849              le32_to_cpu(con->in_reply.global_seq));
850 out:
851         return ret;
852 }
853
854 /*
855  * Verify the hello banner looks okay.
856  */
857 static int verify_hello(struct ceph_connection *con)
858 {
859         if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
860                 pr_err("connect to/from %s has bad banner\n",
861                        pr_addr(&con->peer_addr.in_addr));
862                 con->error_msg = "protocol error, bad banner";
863                 return -1;
864         }
865         return 0;
866 }
867
868 static bool addr_is_blank(struct sockaddr_storage *ss)
869 {
870         switch (ss->ss_family) {
871         case AF_INET:
872                 return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0;
873         case AF_INET6:
874                 return
875                      ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 &&
876                      ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 &&
877                      ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 &&
878                      ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0;
879         }
880         return false;
881 }
882
883 static int addr_port(struct sockaddr_storage *ss)
884 {
885         switch (ss->ss_family) {
886         case AF_INET:
887                 return ((struct sockaddr_in *)ss)->sin_port;
888         case AF_INET6:
889                 return ((struct sockaddr_in6 *)ss)->sin6_port;
890         }
891         return 0;
892 }
893
894 static void addr_set_port(struct sockaddr_storage *ss, int p)
895 {
896         switch (ss->ss_family) {
897         case AF_INET:
898                 ((struct sockaddr_in *)ss)->sin_port = htons(p);
899         case AF_INET6:
900                 ((struct sockaddr_in6 *)ss)->sin6_port = htons(p);
901         }
902 }
903
904 /*
905  * Parse an ip[:port] list into an addr array.  Use the default
906  * monitor port if a port isn't specified.
907  */
908 int ceph_parse_ips(const char *c, const char *end,
909                    struct ceph_entity_addr *addr,
910                    int max_count, int *count)
911 {
912         int i;
913         const char *p = c;
914
915         dout("parse_ips on '%.*s'\n", (int)(end-c), c);
916         for (i = 0; i < max_count; i++) {
917                 const char *ipend;
918                 struct sockaddr_storage *ss = &addr[i].in_addr;
919                 struct sockaddr_in *in4 = (void *)ss;
920                 struct sockaddr_in6 *in6 = (void *)ss;
921                 int port;
922
923                 memset(ss, 0, sizeof(*ss));
924                 if (in4_pton(p, end - p, (u8 *)&in4->sin_addr.s_addr,
925                              ',', &ipend)) {
926                         ss->ss_family = AF_INET;
927                 } else if (in6_pton(p, end - p, (u8 *)&in6->sin6_addr.s6_addr,
928                                     ',', &ipend)) {
929                         ss->ss_family = AF_INET6;
930                 } else {
931                         goto bad;
932                 }
933                 p = ipend;
934
935                 /* port? */
936                 if (p < end && *p == ':') {
937                         port = 0;
938                         p++;
939                         while (p < end && *p >= '0' && *p <= '9') {
940                                 port = (port * 10) + (*p - '0');
941                                 p++;
942                         }
943                         if (port > 65535 || port == 0)
944                                 goto bad;
945                 } else {
946                         port = CEPH_MON_PORT;
947                 }
948
949                 addr_set_port(ss, port);
950
951                 dout("parse_ips got %s\n", pr_addr(ss));
952
953                 if (p == end)
954                         break;
955                 if (*p != ',')
956                         goto bad;
957                 p++;
958         }
959
960         if (p != end)
961                 goto bad;
962
963         if (count)
964                 *count = i + 1;
965         return 0;
966
967 bad:
968         pr_err("parse_ips bad ip '%s'\n", c);
969         return -EINVAL;
970 }
971
972 static int process_connect(struct ceph_connection *con)
973 {
974         dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
975
976         if (verify_hello(con) < 0)
977                 return -1;
978
979         /*
980          * Make sure the other end is who we wanted.  note that the other
981          * end may not yet know their ip address, so if it's 0.0.0.0, give
982          * them the benefit of the doubt.
983          */
984         if (!ceph_entity_addr_is_local(&con->peer_addr,
985                                        &con->actual_peer_addr) &&
986             !(addr_is_blank(&con->actual_peer_addr.in_addr) &&
987               con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
988                 pr_err("wrong peer, want %s/%d, "
989                        "got %s/%d, wtf\n",
990                        pr_addr(&con->peer_addr.in_addr),
991                        con->peer_addr.nonce,
992                        pr_addr(&con->actual_peer_addr.in_addr),
993                        con->actual_peer_addr.nonce);
994                 con->error_msg = "protocol error, wrong peer";
995                 return -1;
996         }
997
998         /*
999          * did we learn our address?
1000          */
1001         if (addr_is_blank(&con->msgr->inst.addr.in_addr)) {
1002                 int port = addr_port(&con->msgr->inst.addr.in_addr);
1003
1004                 memcpy(&con->msgr->inst.addr.in_addr,
1005                        &con->peer_addr_for_me.in_addr,
1006                        sizeof(con->peer_addr_for_me.in_addr));
1007                 addr_set_port(&con->msgr->inst.addr.in_addr, port);
1008                 dout("process_connect learned my addr is %s\n",
1009                      pr_addr(&con->msgr->inst.addr.in_addr));
1010         }
1011
1012         switch (con->in_reply.tag) {
1013         case CEPH_MSGR_TAG_BADPROTOVER:
1014                 dout("process_connect got BADPROTOVER my %d != their %d\n",
1015                      le32_to_cpu(con->out_connect.protocol_version),
1016                      le32_to_cpu(con->in_reply.protocol_version));
1017                 pr_err("%s%lld %s protocol version mismatch,"
1018                        " my %d != server's %d\n",
1019                        ENTITY_NAME(con->peer_name),
1020                        pr_addr(&con->peer_addr.in_addr),
1021                        le32_to_cpu(con->out_connect.protocol_version),
1022                        le32_to_cpu(con->in_reply.protocol_version));
1023                 con->error_msg = "protocol version mismatch";
1024                 if (con->ops->bad_proto)
1025                         con->ops->bad_proto(con);
1026                 reset_connection(con);
1027                 set_bit(CLOSED, &con->state);  /* in case there's queued work */
1028                 return -1;
1029
1030
1031         case CEPH_MSGR_TAG_RESETSESSION:
1032                 /*
1033                  * If we connected with a large connect_seq but the peer
1034                  * has no record of a session with us (no connection, or
1035                  * connect_seq == 0), they will send RESETSESION to indicate
1036                  * that they must have reset their session, and may have
1037                  * dropped messages.
1038                  */
1039                 dout("process_connect got RESET peer seq %u\n",
1040                      le32_to_cpu(con->in_connect.connect_seq));
1041                 pr_err("%s%lld %s connection reset\n",
1042                        ENTITY_NAME(con->peer_name),
1043                        pr_addr(&con->peer_addr.in_addr));
1044                 reset_connection(con);
1045                 prepare_write_connect_retry(con->msgr, con);
1046                 prepare_read_connect(con);
1047
1048                 /* Tell ceph about it. */
1049                 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name));
1050                 if (con->ops->peer_reset)
1051                         con->ops->peer_reset(con);
1052                 break;
1053
1054         case CEPH_MSGR_TAG_RETRY_SESSION:
1055                 /*
1056                  * If we sent a smaller connect_seq than the peer has, try
1057                  * again with a larger value.
1058                  */
1059                 dout("process_connect got RETRY my seq = %u, peer_seq = %u\n",
1060                      le32_to_cpu(con->out_connect.connect_seq),
1061                      le32_to_cpu(con->in_connect.connect_seq));
1062                 con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
1063                 prepare_write_connect_retry(con->msgr, con);
1064                 prepare_read_connect(con);
1065                 break;
1066
1067         case CEPH_MSGR_TAG_RETRY_GLOBAL:
1068                 /*
1069                  * If we sent a smaller global_seq than the peer has, try
1070                  * again with a larger value.
1071                  */
1072                 dout("process_connect got RETRY_GLOBAL my %u, peer_gseq = %u\n",
1073                      con->peer_global_seq,
1074                      le32_to_cpu(con->in_connect.global_seq));
1075                 get_global_seq(con->msgr,
1076                                le32_to_cpu(con->in_connect.global_seq));
1077                 prepare_write_connect_retry(con->msgr, con);
1078                 prepare_read_connect(con);
1079                 break;
1080
1081         case CEPH_MSGR_TAG_READY:
1082                 clear_bit(CONNECTING, &con->state);
1083                 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
1084                         set_bit(LOSSYRX, &con->state);
1085                 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
1086                 con->connect_seq++;
1087                 dout("process_connect got READY gseq %d cseq %d (%d)\n",
1088                      con->peer_global_seq,
1089                      le32_to_cpu(con->in_reply.connect_seq),
1090                      con->connect_seq);
1091                 WARN_ON(con->connect_seq !=
1092                         le32_to_cpu(con->in_reply.connect_seq));
1093
1094                 con->delay = 0;  /* reset backoff memory */
1095                 prepare_read_tag(con);
1096                 break;
1097
1098         case CEPH_MSGR_TAG_WAIT:
1099                 /*
1100                  * If there is a connection race (we are opening
1101                  * connections to each other), one of us may just have
1102                  * to WAIT.  This shouldn't happen if we are the
1103                  * client.
1104                  */
1105                 pr_err("process_connect peer connecting WAIT\n");
1106
1107         default:
1108                 pr_err("connect protocol error, will retry\n");
1109                 con->error_msg = "protocol error, garbage tag during connect";
1110                 return -1;
1111         }
1112         return 0;
1113 }
1114
1115
1116 /*
1117  * read (part of) an ack
1118  */
1119 static int read_partial_ack(struct ceph_connection *con)
1120 {
1121         int to = 0;
1122
1123         return read_partial(con, &to, sizeof(con->in_temp_ack),
1124                             &con->in_temp_ack);
1125 }
1126
1127
1128 /*
1129  * We can finally discard anything that's been acked.
1130  */
1131 static void process_ack(struct ceph_connection *con)
1132 {
1133         struct ceph_msg *m;
1134         u64 ack = le64_to_cpu(con->in_temp_ack);
1135         u64 seq;
1136
1137         mutex_lock(&con->out_mutex);
1138         while (!list_empty(&con->out_sent)) {
1139                 m = list_first_entry(&con->out_sent, struct ceph_msg,
1140                                      list_head);
1141                 seq = le64_to_cpu(m->hdr.seq);
1142                 if (seq > ack)
1143                         break;
1144                 dout("got ack for seq %llu type %d at %p\n", seq,
1145                      le16_to_cpu(m->hdr.type), m);
1146                 ceph_msg_remove(m);
1147         }
1148         mutex_unlock(&con->out_mutex);
1149         prepare_read_tag(con);
1150 }
1151
1152
1153
1154
1155
1156
1157 /*
1158  * read (part of) a message.
1159  */
1160 static int read_partial_message(struct ceph_connection *con)
1161 {
1162         struct ceph_msg *m = con->in_msg;
1163         void *p;
1164         int ret;
1165         int to, want, left;
1166         unsigned front_len, middle_len, data_len, data_off;
1167         int datacrc = con->msgr->nocrc;
1168
1169         dout("read_partial_message con %p msg %p\n", con, m);
1170
1171         /* header */
1172         while (con->in_base_pos < sizeof(con->in_hdr)) {
1173                 left = sizeof(con->in_hdr) - con->in_base_pos;
1174                 ret = ceph_tcp_recvmsg(con->sock,
1175                                        (char *)&con->in_hdr + con->in_base_pos,
1176                                        left);
1177                 if (ret <= 0)
1178                         return ret;
1179                 con->in_base_pos += ret;
1180                 if (con->in_base_pos == sizeof(con->in_hdr)) {
1181                         u32 crc = crc32c(0, (void *)&con->in_hdr,
1182                                  sizeof(con->in_hdr) - sizeof(con->in_hdr.crc));
1183                         if (crc != le32_to_cpu(con->in_hdr.crc)) {
1184                                 pr_err("read_partial_message bad hdr "
1185                                        " crc %u != expected %u\n",
1186                                        crc, con->in_hdr.crc);
1187                                 return -EBADMSG;
1188                         }
1189                 }
1190         }
1191
1192         front_len = le32_to_cpu(con->in_hdr.front_len);
1193         if (front_len > CEPH_MSG_MAX_FRONT_LEN)
1194                 return -EIO;
1195         middle_len = le32_to_cpu(con->in_hdr.middle_len);
1196         if (middle_len > CEPH_MSG_MAX_DATA_LEN)
1197                 return -EIO;
1198         data_len = le32_to_cpu(con->in_hdr.data_len);
1199         if (data_len > CEPH_MSG_MAX_DATA_LEN)
1200                 return -EIO;
1201
1202         /* allocate message? */
1203         if (!con->in_msg) {
1204                 dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
1205                      con->in_hdr.front_len, con->in_hdr.data_len);
1206                 con->in_msg = con->ops->alloc_msg(con, &con->in_hdr);
1207                 if (!con->in_msg) {
1208                         /* skip this message */
1209                         dout("alloc_msg returned NULL, skipping message\n");
1210                         con->in_base_pos = -front_len - middle_len - data_len -
1211                                 sizeof(m->footer);
1212                         con->in_tag = CEPH_MSGR_TAG_READY;
1213                         return 0;
1214                 }
1215                 if (IS_ERR(con->in_msg)) {
1216                         ret = PTR_ERR(con->in_msg);
1217                         con->in_msg = NULL;
1218                         con->error_msg = "out of memory for incoming message";
1219                         return ret;
1220                 }
1221                 m = con->in_msg;
1222                 m->front.iov_len = 0;    /* haven't read it yet */
1223                 memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr));
1224         }
1225
1226         /* front */
1227         while (m->front.iov_len < front_len) {
1228                 BUG_ON(m->front.iov_base == NULL);
1229                 left = front_len - m->front.iov_len;
1230                 ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base +
1231                                        m->front.iov_len, left);
1232                 if (ret <= 0)
1233                         return ret;
1234                 m->front.iov_len += ret;
1235                 if (m->front.iov_len == front_len)
1236                         con->in_front_crc = crc32c(0, m->front.iov_base,
1237                                                       m->front.iov_len);
1238         }
1239
1240         /* middle */
1241         while (middle_len > 0 && (!m->middle ||
1242                                   m->middle->vec.iov_len < middle_len)) {
1243                 if (m->middle == NULL) {
1244                         ret = -EOPNOTSUPP;
1245                         if (con->ops->alloc_middle)
1246                                 ret = con->ops->alloc_middle(con, m);
1247                         if (ret < 0) {
1248                                 dout("alloc_middle failed, skipping payload\n");
1249                                 con->in_base_pos = -middle_len - data_len
1250                                         - sizeof(m->footer);
1251                                 ceph_msg_put(con->in_msg);
1252                                 con->in_msg = NULL;
1253                                 con->in_tag = CEPH_MSGR_TAG_READY;
1254                                 return 0;
1255                         }
1256                         m->middle->vec.iov_len = 0;
1257                 }
1258                 left = middle_len - m->middle->vec.iov_len;
1259                 ret = ceph_tcp_recvmsg(con->sock,
1260                                        (char *)m->middle->vec.iov_base +
1261                                        m->middle->vec.iov_len, left);
1262                 if (ret <= 0)
1263                         return ret;
1264                 m->middle->vec.iov_len += ret;
1265                 if (m->middle->vec.iov_len == middle_len)
1266                         con->in_middle_crc = crc32c(0, m->middle->vec.iov_base,
1267                                                       m->middle->vec.iov_len);
1268         }
1269
1270         /* (page) data */
1271         data_off = le16_to_cpu(m->hdr.data_off);
1272         if (data_len == 0)
1273                 goto no_data;
1274
1275         if (m->nr_pages == 0) {
1276                 con->in_msg_pos.page = 0;
1277                 con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
1278                 con->in_msg_pos.data_pos = 0;
1279                 /* find pages for data payload */
1280                 want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
1281                 ret = -1;
1282                 if (con->ops->prepare_pages)
1283                         ret = con->ops->prepare_pages(con, m, want);
1284                 if (ret < 0) {
1285                         dout("%p prepare_pages failed, skipping payload\n", m);
1286                         con->in_base_pos = -data_len - sizeof(m->footer);
1287                         ceph_msg_put(con->in_msg);
1288                         con->in_msg = NULL;
1289                         con->in_tag = CEPH_MSGR_TAG_READY;
1290                         return 0;
1291                 }
1292                 BUG_ON(m->nr_pages < want);
1293         }
1294         while (con->in_msg_pos.data_pos < data_len) {
1295                 left = min((int)(data_len - con->in_msg_pos.data_pos),
1296                            (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
1297                 BUG_ON(m->pages == NULL);
1298                 p = kmap(m->pages[con->in_msg_pos.page]);
1299                 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1300                                        left);
1301                 if (ret > 0 && datacrc)
1302                         con->in_data_crc =
1303                                 crc32c(con->in_data_crc,
1304                                           p + con->in_msg_pos.page_pos, ret);
1305                 kunmap(m->pages[con->in_msg_pos.page]);
1306                 if (ret <= 0)
1307                         return ret;
1308                 con->in_msg_pos.data_pos += ret;
1309                 con->in_msg_pos.page_pos += ret;
1310                 if (con->in_msg_pos.page_pos == PAGE_SIZE) {
1311                         con->in_msg_pos.page_pos = 0;
1312                         con->in_msg_pos.page++;
1313                 }
1314         }
1315
1316 no_data:
1317         /* footer */
1318         to = sizeof(m->hdr) + sizeof(m->footer);
1319         while (con->in_base_pos < to) {
1320                 left = to - con->in_base_pos;
1321                 ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer +
1322                                        (con->in_base_pos - sizeof(m->hdr)),
1323                                        left);
1324                 if (ret <= 0)
1325                         return ret;
1326                 con->in_base_pos += ret;
1327         }
1328         dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
1329              m, front_len, m->footer.front_crc, middle_len,
1330              m->footer.middle_crc, data_len, m->footer.data_crc);
1331
1332         /* crc ok? */
1333         if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
1334                 pr_err("read_partial_message %p front crc %u != exp. %u\n",
1335                        m, con->in_front_crc, m->footer.front_crc);
1336                 return -EBADMSG;
1337         }
1338         if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
1339                 pr_err("read_partial_message %p middle crc %u != exp %u\n",
1340                        m, con->in_middle_crc, m->footer.middle_crc);
1341                 return -EBADMSG;
1342         }
1343         if (datacrc &&
1344             (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
1345             con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
1346                 pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
1347                        con->in_data_crc, le32_to_cpu(m->footer.data_crc));
1348                 return -EBADMSG;
1349         }
1350
1351         return 1; /* done! */
1352 }
1353
1354 /*
1355  * Process message.  This happens in the worker thread.  The callback should
1356  * be careful not to do anything that waits on other incoming messages or it
1357  * may deadlock.
1358  */
1359 static void process_message(struct ceph_connection *con)
1360 {
1361         struct ceph_msg *msg = con->in_msg;
1362
1363         con->in_msg = NULL;
1364
1365         /* if first message, set peer_name */
1366         if (con->peer_name.type == 0)
1367                 con->peer_name = msg->hdr.src.name;
1368
1369         mutex_lock(&con->out_mutex);
1370         con->in_seq++;
1371         mutex_unlock(&con->out_mutex);
1372
1373         dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
1374              msg, le64_to_cpu(msg->hdr.seq),
1375              ENTITY_NAME(msg->hdr.src.name),
1376              le16_to_cpu(msg->hdr.type),
1377              ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
1378              le32_to_cpu(msg->hdr.front_len),
1379              le32_to_cpu(msg->hdr.data_len),
1380              con->in_front_crc, con->in_middle_crc, con->in_data_crc);
1381         con->ops->dispatch(con, msg);
1382         prepare_read_tag(con);
1383 }
1384
1385
1386 /*
1387  * Write something to the socket.  Called in a worker thread when the
1388  * socket appears to be writeable and we have something ready to send.
1389  */
1390 static int try_write(struct ceph_connection *con)
1391 {
1392         struct ceph_messenger *msgr = con->msgr;
1393         int ret = 1;
1394
1395         dout("try_write start %p state %lu nref %d\n", con, con->state,
1396              atomic_read(&con->nref));
1397
1398         mutex_lock(&con->out_mutex);
1399 more:
1400         dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
1401
1402         /* open the socket first? */
1403         if (con->sock == NULL) {
1404                 /*
1405                  * if we were STANDBY and are reconnecting _this_
1406                  * connection, bump connect_seq now.  Always bump
1407                  * global_seq.
1408                  */
1409                 if (test_and_clear_bit(STANDBY, &con->state))
1410                         con->connect_seq++;
1411
1412                 prepare_write_connect(msgr, con);
1413                 prepare_read_connect(con);
1414                 set_bit(CONNECTING, &con->state);
1415
1416                 con->in_tag = CEPH_MSGR_TAG_READY;
1417                 dout("try_write initiating connect on %p new state %lu\n",
1418                      con, con->state);
1419                 con->sock = ceph_tcp_connect(con);
1420                 if (IS_ERR(con->sock)) {
1421                         con->sock = NULL;
1422                         con->error_msg = "connect error";
1423                         ret = -1;
1424                         goto out;
1425                 }
1426         }
1427
1428 more_kvec:
1429         /* kvec data queued? */
1430         if (con->out_skip) {
1431                 ret = write_partial_skip(con);
1432                 if (ret <= 0)
1433                         goto done;
1434                 if (ret < 0) {
1435                         dout("try_write write_partial_skip err %d\n", ret);
1436                         goto done;
1437                 }
1438         }
1439         if (con->out_kvec_left) {
1440                 ret = write_partial_kvec(con);
1441                 if (ret <= 0)
1442                         goto done;
1443                 if (ret < 0) {
1444                         dout("try_write write_partial_kvec err %d\n", ret);
1445                         goto done;
1446                 }
1447         }
1448
1449         /* msg pages? */
1450         if (con->out_msg) {
1451                 ret = write_partial_msg_pages(con);
1452                 if (ret == 1)
1453                         goto more_kvec;  /* we need to send the footer, too! */
1454                 if (ret == 0)
1455                         goto done;
1456                 if (ret < 0) {
1457                         dout("try_write write_partial_msg_pages err %d\n",
1458                              ret);
1459                         goto done;
1460                 }
1461         }
1462
1463         if (!test_bit(CONNECTING, &con->state)) {
1464                 /* is anything else pending? */
1465                 if (!list_empty(&con->out_queue)) {
1466                         prepare_write_message(con);
1467                         goto more;
1468                 }
1469                 if (con->in_seq > con->in_seq_acked) {
1470                         prepare_write_ack(con);
1471                         goto more;
1472                 }
1473                 if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) {
1474                         prepare_write_keepalive(con);
1475                         goto more;
1476                 }
1477         }
1478
1479         /* Nothing to do! */
1480         clear_bit(WRITE_PENDING, &con->state);
1481         dout("try_write nothing else to write.\n");
1482 done:
1483         ret = 0;
1484 out:
1485         mutex_unlock(&con->out_mutex);
1486         dout("try_write done on %p\n", con);
1487         return ret;
1488 }
1489
1490
1491
1492 /*
1493  * Read what we can from the socket.
1494  */
1495 static int try_read(struct ceph_connection *con)
1496 {
1497         struct ceph_messenger *msgr;
1498         int ret = -1;
1499
1500         if (!con->sock)
1501                 return 0;
1502
1503         if (test_bit(STANDBY, &con->state))
1504                 return 0;
1505
1506         dout("try_read start on %p\n", con);
1507         msgr = con->msgr;
1508
1509 more:
1510         dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
1511              con->in_base_pos);
1512         if (test_bit(CONNECTING, &con->state)) {
1513                 dout("try_read connecting\n");
1514                 ret = read_partial_connect(con);
1515                 if (ret <= 0)
1516                         goto done;
1517                 if (process_connect(con) < 0) {
1518                         ret = -1;
1519                         goto out;
1520                 }
1521                 goto more;
1522         }
1523
1524         if (con->in_base_pos < 0) {
1525                 /*
1526                  * skipping + discarding content.
1527                  *
1528                  * FIXME: there must be a better way to do this!
1529                  */
1530                 static char buf[1024];
1531                 int skip = min(1024, -con->in_base_pos);
1532                 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos);
1533                 ret = ceph_tcp_recvmsg(con->sock, buf, skip);
1534                 if (ret <= 0)
1535                         goto done;
1536                 con->in_base_pos += ret;
1537                 if (con->in_base_pos)
1538                         goto more;
1539         }
1540         if (con->in_tag == CEPH_MSGR_TAG_READY) {
1541                 /*
1542                  * what's next?
1543                  */
1544                 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
1545                 if (ret <= 0)
1546                         goto done;
1547                 dout("try_read got tag %d\n", (int)con->in_tag);
1548                 switch (con->in_tag) {
1549                 case CEPH_MSGR_TAG_MSG:
1550                         prepare_read_message(con);
1551                         break;
1552                 case CEPH_MSGR_TAG_ACK:
1553                         prepare_read_ack(con);
1554                         break;
1555                 case CEPH_MSGR_TAG_CLOSE:
1556                         set_bit(CLOSED, &con->state);   /* fixme */
1557                         goto done;
1558                 default:
1559                         goto bad_tag;
1560                 }
1561         }
1562         if (con->in_tag == CEPH_MSGR_TAG_MSG) {
1563                 ret = read_partial_message(con);
1564                 if (ret <= 0) {
1565                         switch (ret) {
1566                         case -EBADMSG:
1567                                 con->error_msg = "bad crc";
1568                                 ret = -EIO;
1569                                 goto out;
1570                         case -EIO:
1571                                 con->error_msg = "io error";
1572                                 goto out;
1573                         default:
1574                                 goto done;
1575                         }
1576                 }
1577                 if (con->in_tag == CEPH_MSGR_TAG_READY)
1578                         goto more;
1579                 process_message(con);
1580                 goto more;
1581         }
1582         if (con->in_tag == CEPH_MSGR_TAG_ACK) {
1583                 ret = read_partial_ack(con);
1584                 if (ret <= 0)
1585                         goto done;
1586                 process_ack(con);
1587                 goto more;
1588         }
1589
1590 done:
1591         ret = 0;
1592 out:
1593         dout("try_read done on %p\n", con);
1594         return ret;
1595
1596 bad_tag:
1597         pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag);
1598         con->error_msg = "protocol error, garbage tag";
1599         ret = -1;
1600         goto out;
1601 }
1602
1603
1604 /*
1605  * Atomically queue work on a connection.  Bump @con reference to
1606  * avoid races with connection teardown.
1607  *
1608  * There is some trickery going on with QUEUED and BUSY because we
1609  * only want a _single_ thread operating on each connection at any
1610  * point in time, but we want to use all available CPUs.
1611  *
1612  * The worker thread only proceeds if it can atomically set BUSY.  It
1613  * clears QUEUED and does it's thing.  When it thinks it's done, it
1614  * clears BUSY, then rechecks QUEUED.. if it's set again, it loops
1615  * (tries again to set BUSY).
1616  *
1617  * To queue work, we first set QUEUED, _then_ if BUSY isn't set, we
1618  * try to queue work.  If that fails (work is already queued, or BUSY)
1619  * we give up (work also already being done or is queued) but leave QUEUED
1620  * set so that the worker thread will loop if necessary.
1621  */
1622 static void queue_con(struct ceph_connection *con)
1623 {
1624         if (test_bit(DEAD, &con->state)) {
1625                 dout("queue_con %p ignoring: DEAD\n",
1626                      con);
1627                 return;
1628         }
1629
1630         if (!con->ops->get(con)) {
1631                 dout("queue_con %p ref count 0\n", con);
1632                 return;
1633         }
1634
1635         set_bit(QUEUED, &con->state);
1636         if (test_bit(BUSY, &con->state)) {
1637                 dout("queue_con %p - already BUSY\n", con);
1638                 con->ops->put(con);
1639         } else if (!queue_work(ceph_msgr_wq, &con->work.work)) {
1640                 dout("queue_con %p - already queued\n", con);
1641                 con->ops->put(con);
1642         } else {
1643                 dout("queue_con %p\n", con);
1644         }
1645 }
1646
1647 /*
1648  * Do some work on a connection.  Drop a connection ref when we're done.
1649  */
1650 static void con_work(struct work_struct *work)
1651 {
1652         struct ceph_connection *con = container_of(work, struct ceph_connection,
1653                                                    work.work);
1654         int backoff = 0;
1655
1656 more:
1657         if (test_and_set_bit(BUSY, &con->state) != 0) {
1658                 dout("con_work %p BUSY already set\n", con);
1659                 goto out;
1660         }
1661         dout("con_work %p start, clearing QUEUED\n", con);
1662         clear_bit(QUEUED, &con->state);
1663
1664         if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
1665                 dout("con_work CLOSED\n");
1666                 con_close_socket(con);
1667                 goto done;
1668         }
1669         if (test_and_clear_bit(OPENING, &con->state)) {
1670                 /* reopen w/ new peer */
1671                 dout("con_work OPENING\n");
1672                 con_close_socket(con);
1673         }
1674
1675         if (test_and_clear_bit(SOCK_CLOSED, &con->state) ||
1676             try_read(con) < 0 ||
1677             try_write(con) < 0) {
1678                 backoff = 1;
1679                 ceph_fault(con);     /* error/fault path */
1680         }
1681
1682 done:
1683         clear_bit(BUSY, &con->state);
1684         dout("con->state=%lu\n", con->state);
1685         if (test_bit(QUEUED, &con->state)) {
1686                 if (!backoff) {
1687                         dout("con_work %p QUEUED reset, looping\n", con);
1688                         goto more;
1689                 }
1690                 dout("con_work %p QUEUED reset, but just faulted\n", con);
1691                 clear_bit(QUEUED, &con->state);
1692         }
1693         dout("con_work %p done\n", con);
1694
1695 out:
1696         con->ops->put(con);
1697 }
1698
1699
1700 /*
1701  * Generic error/fault handler.  A retry mechanism is used with
1702  * exponential backoff
1703  */
1704 static void ceph_fault(struct ceph_connection *con)
1705 {
1706         pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
1707                pr_addr(&con->peer_addr.in_addr), con->error_msg);
1708         dout("fault %p state %lu to peer %s\n",
1709              con, con->state, pr_addr(&con->peer_addr.in_addr));
1710
1711         if (test_bit(LOSSYTX, &con->state)) {
1712                 dout("fault on LOSSYTX channel\n");
1713                 goto out;
1714         }
1715
1716         clear_bit(BUSY, &con->state);  /* to avoid an improbable race */
1717
1718         con_close_socket(con);
1719         con->in_msg = NULL;
1720
1721         /* If there are no messages in the queue, place the connection
1722          * in a STANDBY state (i.e., don't try to reconnect just yet). */
1723         mutex_lock(&con->out_mutex);
1724         if (list_empty(&con->out_queue) && !con->out_keepalive_pending) {
1725                 dout("fault setting STANDBY\n");
1726                 set_bit(STANDBY, &con->state);
1727                 mutex_unlock(&con->out_mutex);
1728                 goto out;
1729         }
1730
1731         /* Requeue anything that hasn't been acked, and retry after a
1732          * delay. */
1733         list_splice_init(&con->out_sent, &con->out_queue);
1734         mutex_unlock(&con->out_mutex);
1735
1736         if (con->delay == 0)
1737                 con->delay = BASE_DELAY_INTERVAL;
1738         else if (con->delay < MAX_DELAY_INTERVAL)
1739                 con->delay *= 2;
1740
1741         /* explicitly schedule work to try to reconnect again later. */
1742         dout("fault queueing %p delay %lu\n", con, con->delay);
1743         con->ops->get(con);
1744         if (queue_delayed_work(ceph_msgr_wq, &con->work,
1745                                round_jiffies_relative(con->delay)) == 0)
1746                 con->ops->put(con);
1747
1748 out:
1749         if (con->ops->fault)
1750                 con->ops->fault(con);
1751 }
1752
1753
1754
1755 /*
1756  * create a new messenger instance
1757  */
1758 struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
1759 {
1760         struct ceph_messenger *msgr;
1761
1762         msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
1763         if (msgr == NULL)
1764                 return ERR_PTR(-ENOMEM);
1765
1766         spin_lock_init(&msgr->global_seq_lock);
1767
1768         /* the zero page is needed if a request is "canceled" while the message
1769          * is being written over the socket */
1770         msgr->zero_page = alloc_page(GFP_KERNEL | __GFP_ZERO);
1771         if (!msgr->zero_page) {
1772                 kfree(msgr);
1773                 return ERR_PTR(-ENOMEM);
1774         }
1775         kmap(msgr->zero_page);
1776
1777         if (myaddr)
1778                 msgr->inst.addr = *myaddr;
1779
1780         /* select a random nonce */
1781         get_random_bytes(&msgr->inst.addr.nonce,
1782                          sizeof(msgr->inst.addr.nonce));
1783
1784         dout("messenger_create %p\n", msgr);
1785         return msgr;
1786 }
1787
1788 void ceph_messenger_destroy(struct ceph_messenger *msgr)
1789 {
1790         dout("destroy %p\n", msgr);
1791         kunmap(msgr->zero_page);
1792         __free_page(msgr->zero_page);
1793         kfree(msgr);
1794         dout("destroyed messenger %p\n", msgr);
1795 }
1796
1797 /*
1798  * Queue up an outgoing message on the given connection.
1799  */
1800 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
1801 {
1802         if (test_bit(CLOSED, &con->state)) {
1803                 dout("con_send %p closed, dropping %p\n", con, msg);
1804                 ceph_msg_put(msg);
1805                 return;
1806         }
1807
1808         /* set src+dst */
1809         msg->hdr.src = con->msgr->inst;
1810         msg->hdr.orig_src = con->msgr->inst;
1811         msg->hdr.dst_erank = con->peer_addr.erank;
1812
1813         /* queue */
1814         mutex_lock(&con->out_mutex);
1815         BUG_ON(!list_empty(&msg->list_head));
1816         list_add_tail(&msg->list_head, &con->out_queue);
1817         dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
1818              ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
1819              ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
1820              le32_to_cpu(msg->hdr.front_len),
1821              le32_to_cpu(msg->hdr.middle_len),
1822              le32_to_cpu(msg->hdr.data_len));
1823         mutex_unlock(&con->out_mutex);
1824
1825         /* if there wasn't anything waiting to send before, queue
1826          * new work */
1827         if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
1828                 queue_con(con);
1829 }
1830
1831 /*
1832  * Revoke a message that was previously queued for send
1833  */
1834 void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
1835 {
1836         mutex_lock(&con->out_mutex);
1837         if (!list_empty(&msg->list_head)) {
1838                 dout("con_revoke %p msg %p\n", con, msg);
1839                 list_del_init(&msg->list_head);
1840                 ceph_msg_put(msg);
1841                 msg->hdr.seq = 0;
1842                 if (con->out_msg == msg)
1843                         con->out_msg = NULL;
1844                 if (con->out_kvec_is_msg) {
1845                         con->out_skip = con->out_kvec_bytes;
1846                         con->out_kvec_is_msg = false;
1847                 }
1848         } else {
1849                 dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg);
1850         }
1851         mutex_unlock(&con->out_mutex);
1852 }
1853
1854 /*
1855  * Queue a keepalive byte to ensure the tcp connection is alive.
1856  */
1857 void ceph_con_keepalive(struct ceph_connection *con)
1858 {
1859         if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&
1860             test_and_set_bit(WRITE_PENDING, &con->state) == 0)
1861                 queue_con(con);
1862 }
1863
1864
1865 /*
1866  * construct a new message with given type, size
1867  * the new msg has a ref count of 1.
1868  */
1869 struct ceph_msg *ceph_msg_new(int type, int front_len,
1870                               int page_len, int page_off, struct page **pages)
1871 {
1872         struct ceph_msg *m;
1873
1874         m = kmalloc(sizeof(*m), GFP_NOFS);
1875         if (m == NULL)
1876                 goto out;
1877         atomic_set(&m->nref, 1);
1878         INIT_LIST_HEAD(&m->list_head);
1879
1880         m->hdr.type = cpu_to_le16(type);
1881         m->hdr.front_len = cpu_to_le32(front_len);
1882         m->hdr.middle_len = 0;
1883         m->hdr.data_len = cpu_to_le32(page_len);
1884         m->hdr.data_off = cpu_to_le16(page_off);
1885         m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
1886         m->footer.front_crc = 0;
1887         m->footer.middle_crc = 0;
1888         m->footer.data_crc = 0;
1889         m->front_max = front_len;
1890         m->front_is_vmalloc = false;
1891         m->more_to_follow = false;
1892         m->pool = NULL;
1893
1894         /* front */
1895         if (front_len) {
1896                 if (front_len > PAGE_CACHE_SIZE) {
1897                         m->front.iov_base = __vmalloc(front_len, GFP_NOFS,
1898                                                       PAGE_KERNEL);
1899                         m->front_is_vmalloc = true;
1900                 } else {
1901                         m->front.iov_base = kmalloc(front_len, GFP_NOFS);
1902                 }
1903                 if (m->front.iov_base == NULL) {
1904                         pr_err("msg_new can't allocate %d bytes\n",
1905                              front_len);
1906                         goto out2;
1907                 }
1908         } else {
1909                 m->front.iov_base = NULL;
1910         }
1911         m->front.iov_len = front_len;
1912
1913         /* middle */
1914         m->middle = NULL;
1915
1916         /* data */
1917         m->nr_pages = calc_pages_for(page_off, page_len);
1918         m->pages = pages;
1919
1920         dout("ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len,
1921              m->nr_pages);
1922         return m;
1923
1924 out2:
1925         ceph_msg_put(m);
1926 out:
1927         pr_err("msg_new can't create type %d len %d\n", type, front_len);
1928         return ERR_PTR(-ENOMEM);
1929 }
1930
1931 /*
1932  * Generic message allocator, for incoming messages.
1933  */
1934 struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
1935                                 struct ceph_msg_header *hdr)
1936 {
1937         int type = le16_to_cpu(hdr->type);
1938         int front_len = le32_to_cpu(hdr->front_len);
1939         struct ceph_msg *msg = ceph_msg_new(type, front_len, 0, 0, NULL);
1940
1941         if (!msg) {
1942                 pr_err("unable to allocate msg type %d len %d\n",
1943                        type, front_len);
1944                 return ERR_PTR(-ENOMEM);
1945         }
1946         return msg;
1947 }
1948
1949 /*
1950  * Allocate "middle" portion of a message, if it is needed and wasn't
1951  * allocated by alloc_msg.  This allows us to read a small fixed-size
1952  * per-type header in the front and then gracefully fail (i.e.,
1953  * propagate the error to the caller based on info in the front) when
1954  * the middle is too large.
1955  */
1956 int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
1957 {
1958         int type = le16_to_cpu(msg->hdr.type);
1959         int middle_len = le32_to_cpu(msg->hdr.middle_len);
1960
1961         dout("alloc_middle %p type %d %s middle_len %d\n", msg, type,
1962              ceph_msg_type_name(type), middle_len);
1963         BUG_ON(!middle_len);
1964         BUG_ON(msg->middle);
1965
1966         msg->middle = ceph_buffer_new_alloc(middle_len, GFP_NOFS);
1967         if (!msg->middle)
1968                 return -ENOMEM;
1969         return 0;
1970 }
1971
1972
1973 /*
1974  * Free a generically kmalloc'd message.
1975  */
1976 void ceph_msg_kfree(struct ceph_msg *m)
1977 {
1978         dout("msg_kfree %p\n", m);
1979         if (m->front_is_vmalloc)
1980                 vfree(m->front.iov_base);
1981         else
1982                 kfree(m->front.iov_base);
1983         kfree(m);
1984 }
1985
1986 /*
1987  * Drop a msg ref.  Destroy as needed.
1988  */
1989 void ceph_msg_put(struct ceph_msg *m)
1990 {
1991         dout("ceph_msg_put %p %d -> %d\n", m, atomic_read(&m->nref),
1992              atomic_read(&m->nref)-1);
1993         if (atomic_read(&m->nref) <= 0) {
1994                 pr_err("bad ceph_msg_put on %p %llu %d=%s %d+%d\n",
1995                        m, le64_to_cpu(m->hdr.seq),
1996                        le16_to_cpu(m->hdr.type),
1997                        ceph_msg_type_name(le16_to_cpu(m->hdr.type)),
1998                        le32_to_cpu(m->hdr.front_len),
1999                        le32_to_cpu(m->hdr.data_len));
2000                 WARN_ON(1);
2001         }
2002         if (atomic_dec_and_test(&m->nref)) {
2003                 dout("ceph_msg_put last one on %p\n", m);
2004                 WARN_ON(!list_empty(&m->list_head));
2005
2006                 /* drop middle, data, if any */
2007                 if (m->middle) {
2008                         ceph_buffer_put(m->middle);
2009                         m->middle = NULL;
2010                 }
2011                 m->nr_pages = 0;
2012                 m->pages = NULL;
2013
2014                 if (m->pool)
2015                         ceph_msgpool_put(m->pool, m);
2016                 else
2017                         ceph_msg_kfree(m);
2018         }
2019 }