]> bbs.cooldavid.org Git - net-next-2.6.git/blame - fs/ceph/mon_client.c
ceph: handle errors during osd client init
[net-next-2.6.git] / fs / ceph / mon_client.c
CommitLineData
ba75bb98
SW
1#include "ceph_debug.h"
2
3#include <linux/types.h>
4#include <linux/random.h>
5#include <linux/sched.h>
6
7#include "mon_client.h"
8#include "super.h"
9#include "decode.h"
10
11/*
12 * Interact with Ceph monitor cluster. Handle requests for new map
13 * versions, and periodically resend as needed. Also implement
14 * statfs() and umount().
15 *
16 * A small cluster of Ceph "monitors" are responsible for managing critical
17 * cluster configuration and state information. An odd number (e.g., 3, 5)
18 * of cmon daemons use a modified version of the Paxos part-time parliament
19 * algorithm to manage the MDS map (mds cluster membership), OSD map, and
20 * list of clients who have mounted the file system.
21 *
22 * We maintain an open, active session with a monitor at all times in order to
23 * receive timely MDSMap updates. We periodically send a keepalive byte on the
24 * TCP socket to ensure we detect a failure. If the connection does break, we
25 * randomly hunt for a new monitor. Once the connection is reestablished, we
26 * resend any outstanding requests.
27 */
28
29const static struct ceph_connection_operations mon_con_ops;
30
31/*
32 * Decode a monmap blob (e.g., during mount).
33 */
34struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
35{
36 struct ceph_monmap *m = NULL;
37 int i, err = -EINVAL;
38 struct ceph_fsid fsid;
39 u32 epoch, num_mon;
40 u16 version;
41
42 dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
43
44 ceph_decode_16_safe(&p, end, version, bad);
45
46 ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
47 ceph_decode_copy(&p, &fsid, sizeof(fsid));
c89136ea 48 epoch = ceph_decode_32(&p);
ba75bb98 49
c89136ea 50 num_mon = ceph_decode_32(&p);
ba75bb98
SW
51 ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
52
53 if (num_mon >= CEPH_MAX_MON)
54 goto bad;
55 m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
56 if (m == NULL)
57 return ERR_PTR(-ENOMEM);
58 m->fsid = fsid;
59 m->epoch = epoch;
60 m->num_mon = num_mon;
61 ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
63f2d211
SW
62 for (i = 0; i < num_mon; i++)
63 ceph_decode_addr(&m->mon_inst[i].addr);
ba75bb98 64
ba75bb98
SW
65 dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
66 m->num_mon);
67 for (i = 0; i < m->num_mon; i++)
68 dout("monmap_decode mon%d is %s\n", i,
69 pr_addr(&m->mon_inst[i].addr.in_addr));
70 return m;
71
72bad:
73 dout("monmap_decode failed with %d\n", err);
74 kfree(m);
75 return ERR_PTR(err);
76}
77
78/*
79 * return true if *addr is included in the monmap.
80 */
81int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
82{
83 int i;
84
85 for (i = 0; i < m->num_mon; i++)
86 if (ceph_entity_addr_equal(addr, &m->mon_inst[i].addr))
87 return 1;
88 return 0;
89}
90
91/*
92 * Close monitor session, if any.
93 */
94static void __close_session(struct ceph_mon_client *monc)
95{
96 if (monc->con) {
97 dout("__close_session closing mon%d\n", monc->cur_mon);
98 ceph_con_close(monc->con);
99 monc->cur_mon = -1;
100 }
101}
102
103/*
104 * Open a session with a (new) monitor.
105 */
106static int __open_session(struct ceph_mon_client *monc)
107{
108 char r;
109
110 if (monc->cur_mon < 0) {
111 get_random_bytes(&r, 1);
112 monc->cur_mon = r % monc->monmap->num_mon;
113 dout("open_session num=%d r=%d -> mon%d\n",
114 monc->monmap->num_mon, r, monc->cur_mon);
115 monc->sub_sent = 0;
116 monc->sub_renew_after = jiffies; /* i.e., expired */
117 monc->want_next_osdmap = !!monc->want_next_osdmap;
118
119 dout("open_session mon%d opening\n", monc->cur_mon);
120 monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON;
121 monc->con->peer_name.num = cpu_to_le64(monc->cur_mon);
122 ceph_con_open(monc->con,
123 &monc->monmap->mon_inst[monc->cur_mon].addr);
124 } else {
125 dout("open_session mon%d already open\n", monc->cur_mon);
126 }
127 return 0;
128}
129
130static bool __sub_expired(struct ceph_mon_client *monc)
131{
132 return time_after_eq(jiffies, monc->sub_renew_after);
133}
134
135/*
136 * Reschedule delayed work timer.
137 */
138static void __schedule_delayed(struct ceph_mon_client *monc)
139{
140 unsigned delay;
141
142 if (monc->cur_mon < 0 || monc->want_mount || __sub_expired(monc))
143 delay = 10 * HZ;
144 else
145 delay = 20 * HZ;
146 dout("__schedule_delayed after %u\n", delay);
147 schedule_delayed_work(&monc->delayed_work, delay);
148}
149
150/*
151 * Send subscribe request for mdsmap and/or osdmap.
152 */
153static void __send_subscribe(struct ceph_mon_client *monc)
154{
155 dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n",
156 (unsigned)monc->sub_sent, __sub_expired(monc),
157 monc->want_next_osdmap);
158 if ((__sub_expired(monc) && !monc->sub_sent) ||
159 monc->want_next_osdmap == 1) {
160 struct ceph_msg *msg;
161 struct ceph_mon_subscribe_item *i;
162 void *p, *end;
163
164 msg = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 64, 0, 0, NULL);
165 if (!msg)
166 return;
167
168 p = msg->front.iov_base;
169 end = p + msg->front.iov_len;
170
171 dout("__send_subscribe to 'mdsmap' %u+\n",
172 (unsigned)monc->have_mdsmap);
173 if (monc->want_next_osdmap) {
174 dout("__send_subscribe to 'osdmap' %u\n",
175 (unsigned)monc->have_osdmap);
176 ceph_encode_32(&p, 2);
177 ceph_encode_string(&p, end, "osdmap", 6);
178 i = p;
179 i->have = cpu_to_le64(monc->have_osdmap);
180 i->onetime = 1;
181 p += sizeof(*i);
182 monc->want_next_osdmap = 2; /* requested */
183 } else {
184 ceph_encode_32(&p, 1);
185 }
186 ceph_encode_string(&p, end, "mdsmap", 6);
187 i = p;
188 i->have = cpu_to_le64(monc->have_mdsmap);
189 i->onetime = 0;
190 p += sizeof(*i);
191
192 msg->front.iov_len = p - msg->front.iov_base;
193 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
194 ceph_con_send(monc->con, msg);
195
196 monc->sub_sent = jiffies | 1; /* never 0 */
197 }
198}
199
200static void handle_subscribe_ack(struct ceph_mon_client *monc,
201 struct ceph_msg *msg)
202{
203 unsigned seconds;
07bd10fb
SW
204 struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
205
206 if (msg->front.iov_len < sizeof(*h))
207 goto bad;
208 seconds = le32_to_cpu(h->duration);
ba75bb98 209
ba75bb98
SW
210 mutex_lock(&monc->mutex);
211 if (monc->hunting) {
212 pr_info("mon%d %s session established\n",
213 monc->cur_mon, pr_addr(&monc->con->peer_addr.in_addr));
214 monc->hunting = false;
215 }
216 dout("handle_subscribe_ack after %d seconds\n", seconds);
0656d11b 217 monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1;
ba75bb98
SW
218 monc->sub_sent = 0;
219 mutex_unlock(&monc->mutex);
220 return;
221bad:
222 pr_err("got corrupt subscribe-ack msg\n");
223}
224
225/*
226 * Keep track of which maps we have
227 */
228int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got)
229{
230 mutex_lock(&monc->mutex);
231 monc->have_mdsmap = got;
232 mutex_unlock(&monc->mutex);
233 return 0;
234}
235
236int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
237{
238 mutex_lock(&monc->mutex);
239 monc->have_osdmap = got;
240 monc->want_next_osdmap = 0;
241 mutex_unlock(&monc->mutex);
242 return 0;
243}
244
245/*
246 * Register interest in the next osdmap
247 */
248void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
249{
250 dout("request_next_osdmap have %u\n", monc->have_osdmap);
251 mutex_lock(&monc->mutex);
252 if (!monc->want_next_osdmap)
253 monc->want_next_osdmap = 1;
254 if (monc->want_next_osdmap < 2)
255 __send_subscribe(monc);
256 mutex_unlock(&monc->mutex);
257}
258
259
260/*
261 * mount
262 */
263static void __request_mount(struct ceph_mon_client *monc)
264{
265 struct ceph_msg *msg;
266 struct ceph_client_mount *h;
267 int err;
268
269 dout("__request_mount\n");
270 err = __open_session(monc);
271 if (err)
272 return;
273 msg = ceph_msg_new(CEPH_MSG_CLIENT_MOUNT, sizeof(*h), 0, 0, NULL);
274 if (IS_ERR(msg))
275 return;
276 h = msg->front.iov_base;
13e38c8a
SW
277 h->monhdr.have_version = 0;
278 h->monhdr.session_mon = cpu_to_le16(-1);
279 h->monhdr.session_mon_tid = 0;
ba75bb98
SW
280 ceph_con_send(monc->con, msg);
281}
282
283int ceph_monc_request_mount(struct ceph_mon_client *monc)
284{
285 if (!monc->con) {
286 monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
287 if (!monc->con)
288 return -ENOMEM;
289 ceph_con_init(monc->client->msgr, monc->con);
290 monc->con->private = monc;
291 monc->con->ops = &mon_con_ops;
292 }
293
294 mutex_lock(&monc->mutex);
295 __request_mount(monc);
296 __schedule_delayed(monc);
297 mutex_unlock(&monc->mutex);
298 return 0;
299}
300
301/*
302 * The monitor responds with mount ack indicate mount success. The
303 * included client ticket allows the client to talk to MDSs and OSDs.
304 */
305static void handle_mount_ack(struct ceph_mon_client *monc, struct ceph_msg *msg)
306{
307 struct ceph_client *client = monc->client;
308 struct ceph_monmap *monmap = NULL, *old = monc->monmap;
309 void *p, *end;
310 s32 result;
311 u32 len;
312 s64 cnum;
313 int err = -EINVAL;
314
315 if (client->whoami >= 0) {
316 dout("handle_mount_ack - already mounted\n");
317 return;
318 }
319
320 mutex_lock(&monc->mutex);
321
322 dout("handle_mount_ack\n");
323 p = msg->front.iov_base;
324 end = p + msg->front.iov_len;
325
326 ceph_decode_64_safe(&p, end, cnum, bad);
327 ceph_decode_32_safe(&p, end, result, bad);
328 ceph_decode_32_safe(&p, end, len, bad);
329 if (result) {
330 pr_err("mount denied: %.*s (%d)\n", len, (char *)p,
331 result);
332 err = result;
333 goto out;
334 }
335 p += len;
336
337 ceph_decode_32_safe(&p, end, len, bad);
338 ceph_decode_need(&p, end, len, bad);
339 monmap = ceph_monmap_decode(p, p + len);
340 if (IS_ERR(monmap)) {
341 pr_err("problem decoding monmap, %d\n",
342 (int)PTR_ERR(monmap));
343 err = -EINVAL;
344 goto out;
345 }
346 p += len;
347
348 client->monc.monmap = monmap;
349 kfree(old);
350
351 client->signed_ticket = NULL;
352 client->signed_ticket_len = 0;
353
354 monc->want_mount = false;
355
356 client->whoami = cnum;
357 client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
358 client->msgr->inst.name.num = cpu_to_le64(cnum);
359 pr_info("client%lld fsid " FSID_FORMAT "\n",
360 client->whoami, PR_FSID(&client->monc.monmap->fsid));
361
362 ceph_debugfs_client_init(client);
363 __send_subscribe(monc);
364
365 err = 0;
366 goto out;
367
368bad:
369 pr_err("error decoding mount_ack message\n");
370out:
371 client->mount_err = err;
372 mutex_unlock(&monc->mutex);
373 wake_up(&client->mount_wq);
374}
375
376
377
378
379/*
380 * statfs
381 */
382static void handle_statfs_reply(struct ceph_mon_client *monc,
383 struct ceph_msg *msg)
384{
385 struct ceph_mon_statfs_request *req;
386 struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
387 u64 tid;
388
389 if (msg->front.iov_len != sizeof(*reply))
390 goto bad;
391 tid = le64_to_cpu(reply->tid);
392 dout("handle_statfs_reply %p tid %llu\n", msg, tid);
393
394 mutex_lock(&monc->mutex);
395 req = radix_tree_lookup(&monc->statfs_request_tree, tid);
396 if (req) {
397 *req->buf = reply->st;
398 req->result = 0;
399 }
400 mutex_unlock(&monc->mutex);
401 if (req)
402 complete(&req->completion);
403 return;
404
405bad:
406 pr_err("corrupt statfs reply, no tid\n");
407}
408
409/*
410 * (re)send a statfs request
411 */
412static int send_statfs(struct ceph_mon_client *monc,
413 struct ceph_mon_statfs_request *req)
414{
415 struct ceph_msg *msg;
416 struct ceph_mon_statfs *h;
417 int err;
418
419 dout("send_statfs tid %llu\n", req->tid);
420 err = __open_session(monc);
421 if (err)
422 return err;
423 msg = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), 0, 0, NULL);
424 if (IS_ERR(msg))
425 return PTR_ERR(msg);
426 req->request = msg;
427 h = msg->front.iov_base;
13e38c8a
SW
428 h->monhdr.have_version = 0;
429 h->monhdr.session_mon = cpu_to_le16(-1);
430 h->monhdr.session_mon_tid = 0;
ba75bb98
SW
431 h->fsid = monc->monmap->fsid;
432 h->tid = cpu_to_le64(req->tid);
433 ceph_con_send(monc->con, msg);
434 return 0;
435}
436
437/*
438 * Do a synchronous statfs().
439 */
440int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
441{
442 struct ceph_mon_statfs_request req;
443 int err;
444
445 req.buf = buf;
446 init_completion(&req.completion);
447
448 /* allocate memory for reply */
449 err = ceph_msgpool_resv(&monc->msgpool_statfs_reply, 1);
450 if (err)
451 return err;
452
453 /* register request */
454 mutex_lock(&monc->mutex);
455 req.tid = ++monc->last_tid;
456 req.last_attempt = jiffies;
457 req.delay = BASE_DELAY_INTERVAL;
458 if (radix_tree_insert(&monc->statfs_request_tree, req.tid, &req) < 0) {
459 mutex_unlock(&monc->mutex);
460 pr_err("ENOMEM in do_statfs\n");
461 return -ENOMEM;
462 }
463 monc->num_statfs_requests++;
464 mutex_unlock(&monc->mutex);
465
466 /* send request and wait */
467 err = send_statfs(monc, &req);
468 if (!err)
469 err = wait_for_completion_interruptible(&req.completion);
470
471 mutex_lock(&monc->mutex);
472 radix_tree_delete(&monc->statfs_request_tree, req.tid);
473 monc->num_statfs_requests--;
474 ceph_msgpool_resv(&monc->msgpool_statfs_reply, -1);
475 mutex_unlock(&monc->mutex);
476
477 if (!err)
478 err = req.result;
479 return err;
480}
481
482/*
483 * Resend pending statfs requests.
484 */
485static void __resend_statfs(struct ceph_mon_client *monc)
486{
487 u64 next_tid = 0;
488 int got;
489 int did = 0;
490 struct ceph_mon_statfs_request *req;
491
492 while (1) {
493 got = radix_tree_gang_lookup(&monc->statfs_request_tree,
494 (void **)&req,
495 next_tid, 1);
496 if (got == 0)
497 break;
498 did++;
499 next_tid = req->tid + 1;
500
501 send_statfs(monc, req);
502 }
503}
504
505/*
506 * Delayed work. If we haven't mounted yet, retry. Otherwise,
507 * renew/retry subscription as needed (in case it is timing out, or we
508 * got an ENOMEM). And keep the monitor connection alive.
509 */
510static void delayed_work(struct work_struct *work)
511{
512 struct ceph_mon_client *monc =
513 container_of(work, struct ceph_mon_client, delayed_work.work);
514
515 dout("monc delayed_work\n");
516 mutex_lock(&monc->mutex);
517 if (monc->want_mount) {
518 __request_mount(monc);
519 } else {
0656d11b 520 if (monc->hunting) {
ba75bb98
SW
521 __close_session(monc);
522 __open_session(monc); /* continue hunting */
523 } else {
524 ceph_con_keepalive(monc->con);
525 }
526 }
527 __send_subscribe(monc);
528 __schedule_delayed(monc);
529 mutex_unlock(&monc->mutex);
530}
531
6b805185
SW
532/*
533 * On startup, we build a temporary monmap populated with the IPs
534 * provided by mount(2).
535 */
536static int build_initial_monmap(struct ceph_mon_client *monc)
537{
538 struct ceph_mount_args *args = monc->client->mount_args;
539 struct ceph_entity_addr *mon_addr = args->mon_addr;
540 int num_mon = args->num_mon;
541 int i;
542
543 /* build initial monmap */
544 monc->monmap = kzalloc(sizeof(*monc->monmap) +
545 num_mon*sizeof(monc->monmap->mon_inst[0]),
546 GFP_KERNEL);
547 if (!monc->monmap)
548 return -ENOMEM;
549 for (i = 0; i < num_mon; i++) {
550 monc->monmap->mon_inst[i].addr = mon_addr[i];
551 monc->monmap->mon_inst[i].addr.erank = 0;
552 monc->monmap->mon_inst[i].addr.nonce = 0;
553 monc->monmap->mon_inst[i].name.type =
554 CEPH_ENTITY_TYPE_MON;
555 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
556 }
557 monc->monmap->num_mon = num_mon;
558
559 /* release addr memory */
560 kfree(args->mon_addr);
561 args->mon_addr = NULL;
562 args->num_mon = 0;
563 return 0;
564}
565
ba75bb98
SW
566int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
567{
568 int err = 0;
569
570 dout("init\n");
571 memset(monc, 0, sizeof(*monc));
572 monc->client = cl;
573 monc->monmap = NULL;
574 mutex_init(&monc->mutex);
575
6b805185
SW
576 err = build_initial_monmap(monc);
577 if (err)
578 goto out;
579
ba75bb98
SW
580 monc->con = NULL;
581
582 /* msg pools */
583 err = ceph_msgpool_init(&monc->msgpool_mount_ack, 4096, 1, false);
584 if (err < 0)
585 goto out;
07bd10fb
SW
586 err = ceph_msgpool_init(&monc->msgpool_subscribe_ack,
587 sizeof(struct ceph_mon_subscribe_ack), 1, false);
ba75bb98
SW
588 if (err < 0)
589 goto out;
590 err = ceph_msgpool_init(&monc->msgpool_statfs_reply,
591 sizeof(struct ceph_mon_statfs_reply), 0, false);
592 if (err < 0)
593 goto out;
594
595 monc->cur_mon = -1;
596 monc->hunting = false; /* not really */
597 monc->sub_renew_after = jiffies;
598 monc->sub_sent = 0;
599
600 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
601 INIT_RADIX_TREE(&monc->statfs_request_tree, GFP_NOFS);
602 monc->num_statfs_requests = 0;
603 monc->last_tid = 0;
604
605 monc->have_mdsmap = 0;
606 monc->have_osdmap = 0;
607 monc->want_next_osdmap = 1;
608 monc->want_mount = true;
609out:
610 return err;
611}
612
613void ceph_monc_stop(struct ceph_mon_client *monc)
614{
615 dout("stop\n");
616 cancel_delayed_work_sync(&monc->delayed_work);
617
618 mutex_lock(&monc->mutex);
619 __close_session(monc);
620 if (monc->con) {
621 monc->con->private = NULL;
622 monc->con->ops->put(monc->con);
623 monc->con = NULL;
624 }
625 mutex_unlock(&monc->mutex);
626
627 ceph_msgpool_destroy(&monc->msgpool_mount_ack);
628 ceph_msgpool_destroy(&monc->msgpool_subscribe_ack);
629 ceph_msgpool_destroy(&monc->msgpool_statfs_reply);
630
631 kfree(monc->monmap);
632}
633
634
635/*
636 * handle incoming message
637 */
638static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
639{
640 struct ceph_mon_client *monc = con->private;
641 int type = le16_to_cpu(msg->hdr.type);
642
643 if (!monc)
644 return;
645
646 switch (type) {
647 case CEPH_MSG_CLIENT_MOUNT_ACK:
648 handle_mount_ack(monc, msg);
649 break;
650
651 case CEPH_MSG_MON_SUBSCRIBE_ACK:
652 handle_subscribe_ack(monc, msg);
653 break;
654
655 case CEPH_MSG_STATFS_REPLY:
656 handle_statfs_reply(monc, msg);
657 break;
658
659 case CEPH_MSG_MDS_MAP:
660 ceph_mdsc_handle_map(&monc->client->mdsc, msg);
661 break;
662
663 case CEPH_MSG_OSD_MAP:
664 ceph_osdc_handle_map(&monc->client->osdc, msg);
665 break;
666
667 default:
668 pr_err("received unknown message type %d %s\n", type,
669 ceph_msg_type_name(type));
670 }
671 ceph_msg_put(msg);
672}
673
674/*
675 * Allocate memory for incoming message
676 */
677static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
678 struct ceph_msg_header *hdr)
679{
680 struct ceph_mon_client *monc = con->private;
681 int type = le16_to_cpu(hdr->type);
8f3bc053 682 int front = le32_to_cpu(hdr->front_len);
ba75bb98
SW
683
684 switch (type) {
685 case CEPH_MSG_CLIENT_MOUNT_ACK:
8f3bc053 686 return ceph_msgpool_get(&monc->msgpool_mount_ack, front);
ba75bb98 687 case CEPH_MSG_MON_SUBSCRIBE_ACK:
8f3bc053 688 return ceph_msgpool_get(&monc->msgpool_subscribe_ack, front);
ba75bb98 689 case CEPH_MSG_STATFS_REPLY:
8f3bc053 690 return ceph_msgpool_get(&monc->msgpool_statfs_reply, front);
ba75bb98
SW
691 }
692 return ceph_alloc_msg(con, hdr);
693}
694
695/*
696 * If the monitor connection resets, pick a new monitor and resubmit
697 * any pending requests.
698 */
699static void mon_fault(struct ceph_connection *con)
700{
701 struct ceph_mon_client *monc = con->private;
702
703 if (!monc)
704 return;
705
706 dout("mon_fault\n");
707 mutex_lock(&monc->mutex);
708 if (!con->private)
709 goto out;
710
711 if (monc->con && !monc->hunting)
712 pr_info("mon%d %s session lost, "
713 "hunting for new mon\n", monc->cur_mon,
714 pr_addr(&monc->con->peer_addr.in_addr));
715
716 __close_session(monc);
717 if (!monc->hunting) {
718 /* start hunting */
719 monc->hunting = true;
720 if (__open_session(monc) == 0) {
721 __send_subscribe(monc);
722 __resend_statfs(monc);
723 }
724 } else {
725 /* already hunting, let's wait a bit */
726 __schedule_delayed(monc);
727 }
728out:
729 mutex_unlock(&monc->mutex);
730}
731
732const static struct ceph_connection_operations mon_con_ops = {
733 .get = ceph_con_get,
734 .put = ceph_con_put,
735 .dispatch = dispatch,
736 .fault = mon_fault,
737 .alloc_msg = mon_alloc_msg,
738 .alloc_middle = ceph_alloc_middle,
739};