]> bbs.cooldavid.org Git - net-next-2.6.git/blame - fs/ceph/mds_client.c
ceph: support v2 reconnect encoding
[net-next-2.6.git] / fs / ceph / mds_client.c
CommitLineData
2f2dc053
SW
1#include "ceph_debug.h"
2
3#include <linux/wait.h>
5a0e3ad6 4#include <linux/slab.h>
2f2dc053
SW
5#include <linux/sched.h>
6
7#include "mds_client.h"
8#include "mon_client.h"
9#include "super.h"
10#include "messenger.h"
11#include "decode.h"
4e7a5dcd 12#include "auth.h"
93cea5be 13#include "pagelist.h"
2f2dc053
SW
14
15/*
16 * A cluster of MDS (metadata server) daemons is responsible for
17 * managing the file system namespace (the directory hierarchy and
18 * inodes) and for coordinating shared access to storage. Metadata is
19 * partitioning hierarchically across a number of servers, and that
20 * partition varies over time as the cluster adjusts the distribution
21 * in order to balance load.
22 *
23 * The MDS client is primarily responsible to managing synchronous
24 * metadata requests for operations like open, unlink, and so forth.
25 * If there is a MDS failure, we find out about it when we (possibly
26 * request and) receive a new MDS map, and can resubmit affected
27 * requests.
28 *
29 * For the most part, though, we take advantage of a lossless
30 * communications channel to the MDS, and do not need to worry about
31 * timing out or resubmitting requests.
32 *
33 * We maintain a stateful "session" with each MDS we interact with.
34 * Within each session, we sent periodic heartbeat messages to ensure
35 * any capabilities or leases we have been issues remain valid. If
36 * the session times out and goes stale, our leases and capabilities
37 * are no longer valid.
38 */
39
20cb34ae
SW
40struct ceph_reconnect_state {
41 struct ceph_pagelist *pagelist;
42 bool flock;
43};
44
2f2dc053
SW
45static void __wake_requests(struct ceph_mds_client *mdsc,
46 struct list_head *head);
47
9e32789f 48static const struct ceph_connection_operations mds_con_ops;
2f2dc053
SW
49
50
51/*
52 * mds reply parsing
53 */
54
55/*
56 * parse individual inode info
57 */
58static int parse_reply_info_in(void **p, void *end,
59 struct ceph_mds_reply_info_in *info)
60{
61 int err = -EIO;
62
63 info->in = *p;
64 *p += sizeof(struct ceph_mds_reply_inode) +
65 sizeof(*info->in->fragtree.splits) *
66 le32_to_cpu(info->in->fragtree.nsplits);
67
68 ceph_decode_32_safe(p, end, info->symlink_len, bad);
69 ceph_decode_need(p, end, info->symlink_len, bad);
70 info->symlink = *p;
71 *p += info->symlink_len;
72
73 ceph_decode_32_safe(p, end, info->xattr_len, bad);
74 ceph_decode_need(p, end, info->xattr_len, bad);
75 info->xattr_data = *p;
76 *p += info->xattr_len;
77 return 0;
78bad:
79 return err;
80}
81
82/*
83 * parse a normal reply, which may contain a (dir+)dentry and/or a
84 * target inode.
85 */
86static int parse_reply_info_trace(void **p, void *end,
87 struct ceph_mds_reply_info_parsed *info)
88{
89 int err;
90
91 if (info->head->is_dentry) {
92 err = parse_reply_info_in(p, end, &info->diri);
93 if (err < 0)
94 goto out_bad;
95
96 if (unlikely(*p + sizeof(*info->dirfrag) > end))
97 goto bad;
98 info->dirfrag = *p;
99 *p += sizeof(*info->dirfrag) +
100 sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
101 if (unlikely(*p > end))
102 goto bad;
103
104 ceph_decode_32_safe(p, end, info->dname_len, bad);
105 ceph_decode_need(p, end, info->dname_len, bad);
106 info->dname = *p;
107 *p += info->dname_len;
108 info->dlease = *p;
109 *p += sizeof(*info->dlease);
110 }
111
112 if (info->head->is_target) {
113 err = parse_reply_info_in(p, end, &info->targeti);
114 if (err < 0)
115 goto out_bad;
116 }
117
118 if (unlikely(*p != end))
119 goto bad;
120 return 0;
121
122bad:
123 err = -EIO;
124out_bad:
125 pr_err("problem parsing mds trace %d\n", err);
126 return err;
127}
128
129/*
130 * parse readdir results
131 */
132static int parse_reply_info_dir(void **p, void *end,
133 struct ceph_mds_reply_info_parsed *info)
134{
135 u32 num, i = 0;
136 int err;
137
138 info->dir_dir = *p;
139 if (*p + sizeof(*info->dir_dir) > end)
140 goto bad;
141 *p += sizeof(*info->dir_dir) +
142 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
143 if (*p > end)
144 goto bad;
145
146 ceph_decode_need(p, end, sizeof(num) + 2, bad);
c89136ea
SW
147 num = ceph_decode_32(p);
148 info->dir_end = ceph_decode_8(p);
149 info->dir_complete = ceph_decode_8(p);
2f2dc053
SW
150 if (num == 0)
151 goto done;
152
153 /* alloc large array */
154 info->dir_nr = num;
155 info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
156 sizeof(*info->dir_dname) +
157 sizeof(*info->dir_dname_len) +
158 sizeof(*info->dir_dlease),
159 GFP_NOFS);
160 if (info->dir_in == NULL) {
161 err = -ENOMEM;
162 goto out_bad;
163 }
164 info->dir_dname = (void *)(info->dir_in + num);
165 info->dir_dname_len = (void *)(info->dir_dname + num);
166 info->dir_dlease = (void *)(info->dir_dname_len + num);
167
168 while (num) {
169 /* dentry */
170 ceph_decode_need(p, end, sizeof(u32)*2, bad);
c89136ea 171 info->dir_dname_len[i] = ceph_decode_32(p);
2f2dc053
SW
172 ceph_decode_need(p, end, info->dir_dname_len[i], bad);
173 info->dir_dname[i] = *p;
174 *p += info->dir_dname_len[i];
175 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
176 info->dir_dname[i]);
177 info->dir_dlease[i] = *p;
178 *p += sizeof(struct ceph_mds_reply_lease);
179
180 /* inode */
181 err = parse_reply_info_in(p, end, &info->dir_in[i]);
182 if (err < 0)
183 goto out_bad;
184 i++;
185 num--;
186 }
187
188done:
189 if (*p != end)
190 goto bad;
191 return 0;
192
193bad:
194 err = -EIO;
195out_bad:
196 pr_err("problem parsing dir contents %d\n", err);
197 return err;
198}
199
200/*
201 * parse entire mds reply
202 */
203static int parse_reply_info(struct ceph_msg *msg,
204 struct ceph_mds_reply_info_parsed *info)
205{
206 void *p, *end;
207 u32 len;
208 int err;
209
210 info->head = msg->front.iov_base;
211 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
212 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
213
214 /* trace */
215 ceph_decode_32_safe(&p, end, len, bad);
216 if (len > 0) {
217 err = parse_reply_info_trace(&p, p+len, info);
218 if (err < 0)
219 goto out_bad;
220 }
221
222 /* dir content */
223 ceph_decode_32_safe(&p, end, len, bad);
224 if (len > 0) {
225 err = parse_reply_info_dir(&p, p+len, info);
226 if (err < 0)
227 goto out_bad;
228 }
229
230 /* snap blob */
231 ceph_decode_32_safe(&p, end, len, bad);
232 info->snapblob_len = len;
233 info->snapblob = p;
234 p += len;
235
236 if (p != end)
237 goto bad;
238 return 0;
239
240bad:
241 err = -EIO;
242out_bad:
243 pr_err("mds parse_reply err %d\n", err);
244 return err;
245}
246
247static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
248{
249 kfree(info->dir_in);
250}
251
252
253/*
254 * sessions
255 */
256static const char *session_state_name(int s)
257{
258 switch (s) {
259 case CEPH_MDS_SESSION_NEW: return "new";
260 case CEPH_MDS_SESSION_OPENING: return "opening";
261 case CEPH_MDS_SESSION_OPEN: return "open";
262 case CEPH_MDS_SESSION_HUNG: return "hung";
263 case CEPH_MDS_SESSION_CLOSING: return "closing";
44ca18f2 264 case CEPH_MDS_SESSION_RESTARTING: return "restarting";
2f2dc053
SW
265 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
266 default: return "???";
267 }
268}
269
270static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
271{
272 if (atomic_inc_not_zero(&s->s_ref)) {
273 dout("mdsc get_session %p %d -> %d\n", s,
274 atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
275 return s;
276 } else {
277 dout("mdsc get_session %p 0 -- FAIL", s);
278 return NULL;
279 }
280}
281
282void ceph_put_mds_session(struct ceph_mds_session *s)
283{
284 dout("mdsc put_session %p %d -> %d\n", s,
285 atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
4e7a5dcd
SW
286 if (atomic_dec_and_test(&s->s_ref)) {
287 if (s->s_authorizer)
288 s->s_mdsc->client->monc.auth->ops->destroy_authorizer(
289 s->s_mdsc->client->monc.auth, s->s_authorizer);
2f2dc053 290 kfree(s);
4e7a5dcd 291 }
2f2dc053
SW
292}
293
294/*
295 * called under mdsc->mutex
296 */
297struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
298 int mds)
299{
300 struct ceph_mds_session *session;
301
302 if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
303 return NULL;
304 session = mdsc->sessions[mds];
305 dout("lookup_mds_session %p %d\n", session,
306 atomic_read(&session->s_ref));
307 get_session(session);
308 return session;
309}
310
311static bool __have_session(struct ceph_mds_client *mdsc, int mds)
312{
313 if (mds >= mdsc->max_sessions)
314 return false;
315 return mdsc->sessions[mds];
316}
317
2600d2dd
SW
318static int __verify_registered_session(struct ceph_mds_client *mdsc,
319 struct ceph_mds_session *s)
320{
321 if (s->s_mds >= mdsc->max_sessions ||
322 mdsc->sessions[s->s_mds] != s)
323 return -ENOENT;
324 return 0;
325}
326
2f2dc053
SW
327/*
328 * create+register a new session for given mds.
329 * called under mdsc->mutex.
330 */
331static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
332 int mds)
333{
334 struct ceph_mds_session *s;
335
336 s = kzalloc(sizeof(*s), GFP_NOFS);
4736b009
DC
337 if (!s)
338 return ERR_PTR(-ENOMEM);
2f2dc053
SW
339 s->s_mdsc = mdsc;
340 s->s_mds = mds;
341 s->s_state = CEPH_MDS_SESSION_NEW;
342 s->s_ttl = 0;
343 s->s_seq = 0;
344 mutex_init(&s->s_mutex);
345
346 ceph_con_init(mdsc->client->msgr, &s->s_con);
347 s->s_con.private = s;
348 s->s_con.ops = &mds_con_ops;
349 s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
350 s->s_con.peer_name.num = cpu_to_le64(mds);
2f2dc053
SW
351
352 spin_lock_init(&s->s_cap_lock);
353 s->s_cap_gen = 0;
354 s->s_cap_ttl = 0;
355 s->s_renew_requested = 0;
356 s->s_renew_seq = 0;
357 INIT_LIST_HEAD(&s->s_caps);
358 s->s_nr_caps = 0;
5dacf091 359 s->s_trim_caps = 0;
2f2dc053
SW
360 atomic_set(&s->s_ref, 1);
361 INIT_LIST_HEAD(&s->s_waiting);
362 INIT_LIST_HEAD(&s->s_unsafe);
363 s->s_num_cap_releases = 0;
7c1332b8 364 s->s_cap_iterator = NULL;
2f2dc053
SW
365 INIT_LIST_HEAD(&s->s_cap_releases);
366 INIT_LIST_HEAD(&s->s_cap_releases_done);
367 INIT_LIST_HEAD(&s->s_cap_flushing);
368 INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
369
370 dout("register_session mds%d\n", mds);
371 if (mds >= mdsc->max_sessions) {
372 int newmax = 1 << get_count_order(mds+1);
373 struct ceph_mds_session **sa;
374
375 dout("register_session realloc to %d\n", newmax);
376 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
377 if (sa == NULL)
42ce56e5 378 goto fail_realloc;
2f2dc053
SW
379 if (mdsc->sessions) {
380 memcpy(sa, mdsc->sessions,
381 mdsc->max_sessions * sizeof(void *));
382 kfree(mdsc->sessions);
383 }
384 mdsc->sessions = sa;
385 mdsc->max_sessions = newmax;
386 }
387 mdsc->sessions[mds] = s;
388 atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */
42ce56e5
SW
389
390 ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
391
2f2dc053 392 return s;
42ce56e5
SW
393
394fail_realloc:
395 kfree(s);
396 return ERR_PTR(-ENOMEM);
2f2dc053
SW
397}
398
399/*
400 * called under mdsc->mutex
401 */
2600d2dd 402static void __unregister_session(struct ceph_mds_client *mdsc,
42ce56e5 403 struct ceph_mds_session *s)
2f2dc053 404{
2600d2dd
SW
405 dout("__unregister_session mds%d %p\n", s->s_mds, s);
406 BUG_ON(mdsc->sessions[s->s_mds] != s);
42ce56e5
SW
407 mdsc->sessions[s->s_mds] = NULL;
408 ceph_con_close(&s->s_con);
409 ceph_put_mds_session(s);
2f2dc053
SW
410}
411
412/*
413 * drop session refs in request.
414 *
415 * should be last request ref, or hold mdsc->mutex
416 */
417static void put_request_session(struct ceph_mds_request *req)
418{
419 if (req->r_session) {
420 ceph_put_mds_session(req->r_session);
421 req->r_session = NULL;
422 }
423}
424
153c8e6b 425void ceph_mdsc_release_request(struct kref *kref)
2f2dc053 426{
153c8e6b
SW
427 struct ceph_mds_request *req = container_of(kref,
428 struct ceph_mds_request,
429 r_kref);
430 if (req->r_request)
431 ceph_msg_put(req->r_request);
432 if (req->r_reply) {
433 ceph_msg_put(req->r_reply);
434 destroy_reply_info(&req->r_reply_info);
435 }
436 if (req->r_inode) {
437 ceph_put_cap_refs(ceph_inode(req->r_inode),
438 CEPH_CAP_PIN);
439 iput(req->r_inode);
440 }
441 if (req->r_locked_dir)
442 ceph_put_cap_refs(ceph_inode(req->r_locked_dir),
443 CEPH_CAP_PIN);
444 if (req->r_target_inode)
445 iput(req->r_target_inode);
446 if (req->r_dentry)
447 dput(req->r_dentry);
448 if (req->r_old_dentry) {
449 ceph_put_cap_refs(
450 ceph_inode(req->r_old_dentry->d_parent->d_inode),
451 CEPH_CAP_PIN);
452 dput(req->r_old_dentry);
2f2dc053 453 }
153c8e6b
SW
454 kfree(req->r_path1);
455 kfree(req->r_path2);
456 put_request_session(req);
37151668 457 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
153c8e6b 458 kfree(req);
2f2dc053
SW
459}
460
461/*
462 * lookup session, bump ref if found.
463 *
464 * called under mdsc->mutex.
465 */
466static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
467 u64 tid)
468{
469 struct ceph_mds_request *req;
44ca18f2
SW
470 struct rb_node *n = mdsc->request_tree.rb_node;
471
472 while (n) {
473 req = rb_entry(n, struct ceph_mds_request, r_node);
474 if (tid < req->r_tid)
475 n = n->rb_left;
476 else if (tid > req->r_tid)
477 n = n->rb_right;
478 else {
479 ceph_mdsc_get_request(req);
480 return req;
481 }
482 }
483 return NULL;
484}
485
486static void __insert_request(struct ceph_mds_client *mdsc,
487 struct ceph_mds_request *new)
488{
489 struct rb_node **p = &mdsc->request_tree.rb_node;
490 struct rb_node *parent = NULL;
491 struct ceph_mds_request *req = NULL;
492
493 while (*p) {
494 parent = *p;
495 req = rb_entry(parent, struct ceph_mds_request, r_node);
496 if (new->r_tid < req->r_tid)
497 p = &(*p)->rb_left;
498 else if (new->r_tid > req->r_tid)
499 p = &(*p)->rb_right;
500 else
501 BUG();
502 }
503
504 rb_link_node(&new->r_node, parent, p);
505 rb_insert_color(&new->r_node, &mdsc->request_tree);
2f2dc053
SW
506}
507
508/*
509 * Register an in-flight request, and assign a tid. Link to directory
510 * are modifying (if any).
511 *
512 * Called under mdsc->mutex.
513 */
514static void __register_request(struct ceph_mds_client *mdsc,
515 struct ceph_mds_request *req,
516 struct inode *dir)
517{
518 req->r_tid = ++mdsc->last_tid;
519 if (req->r_num_caps)
37151668
YS
520 ceph_reserve_caps(mdsc, &req->r_caps_reservation,
521 req->r_num_caps);
2f2dc053
SW
522 dout("__register_request %p tid %lld\n", req, req->r_tid);
523 ceph_mdsc_get_request(req);
44ca18f2 524 __insert_request(mdsc, req);
2f2dc053
SW
525
526 if (dir) {
527 struct ceph_inode_info *ci = ceph_inode(dir);
528
529 spin_lock(&ci->i_unsafe_lock);
530 req->r_unsafe_dir = dir;
531 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
532 spin_unlock(&ci->i_unsafe_lock);
533 }
534}
535
536static void __unregister_request(struct ceph_mds_client *mdsc,
537 struct ceph_mds_request *req)
538{
539 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
44ca18f2 540 rb_erase(&req->r_node, &mdsc->request_tree);
80fc7314 541 RB_CLEAR_NODE(&req->r_node);
2f2dc053
SW
542
543 if (req->r_unsafe_dir) {
544 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
545
546 spin_lock(&ci->i_unsafe_lock);
547 list_del_init(&req->r_unsafe_dir_item);
548 spin_unlock(&ci->i_unsafe_lock);
549 }
94aa8ae1
SW
550
551 ceph_mdsc_put_request(req);
2f2dc053
SW
552}
553
554/*
555 * Choose mds to send request to next. If there is a hint set in the
556 * request (e.g., due to a prior forward hint from the mds), use that.
557 * Otherwise, consult frag tree and/or caps to identify the
558 * appropriate mds. If all else fails, choose randomly.
559 *
560 * Called under mdsc->mutex.
561 */
562static int __choose_mds(struct ceph_mds_client *mdsc,
563 struct ceph_mds_request *req)
564{
565 struct inode *inode;
566 struct ceph_inode_info *ci;
567 struct ceph_cap *cap;
568 int mode = req->r_direct_mode;
569 int mds = -1;
570 u32 hash = req->r_direct_hash;
571 bool is_hash = req->r_direct_is_hash;
572
573 /*
574 * is there a specific mds we should try? ignore hint if we have
575 * no session and the mds is not up (active or recovering).
576 */
577 if (req->r_resend_mds >= 0 &&
578 (__have_session(mdsc, req->r_resend_mds) ||
579 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
580 dout("choose_mds using resend_mds mds%d\n",
581 req->r_resend_mds);
582 return req->r_resend_mds;
583 }
584
585 if (mode == USE_RANDOM_MDS)
586 goto random;
587
588 inode = NULL;
589 if (req->r_inode) {
590 inode = req->r_inode;
591 } else if (req->r_dentry) {
592 if (req->r_dentry->d_inode) {
593 inode = req->r_dentry->d_inode;
594 } else {
595 inode = req->r_dentry->d_parent->d_inode;
596 hash = req->r_dentry->d_name.hash;
597 is_hash = true;
598 }
599 }
600 dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
601 (int)hash, mode);
602 if (!inode)
603 goto random;
604 ci = ceph_inode(inode);
605
606 if (is_hash && S_ISDIR(inode->i_mode)) {
607 struct ceph_inode_frag frag;
608 int found;
609
610 ceph_choose_frag(ci, hash, &frag, &found);
611 if (found) {
612 if (mode == USE_ANY_MDS && frag.ndist > 0) {
613 u8 r;
614
615 /* choose a random replica */
616 get_random_bytes(&r, 1);
617 r %= frag.ndist;
618 mds = frag.dist[r];
619 dout("choose_mds %p %llx.%llx "
620 "frag %u mds%d (%d/%d)\n",
621 inode, ceph_vinop(inode),
622 frag.frag, frag.mds,
623 (int)r, frag.ndist);
624 return mds;
625 }
626
627 /* since this file/dir wasn't known to be
628 * replicated, then we want to look for the
629 * authoritative mds. */
630 mode = USE_AUTH_MDS;
631 if (frag.mds >= 0) {
632 /* choose auth mds */
633 mds = frag.mds;
634 dout("choose_mds %p %llx.%llx "
635 "frag %u mds%d (auth)\n",
636 inode, ceph_vinop(inode), frag.frag, mds);
637 return mds;
638 }
639 }
640 }
641
642 spin_lock(&inode->i_lock);
643 cap = NULL;
644 if (mode == USE_AUTH_MDS)
645 cap = ci->i_auth_cap;
646 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
647 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
648 if (!cap) {
649 spin_unlock(&inode->i_lock);
650 goto random;
651 }
652 mds = cap->session->s_mds;
653 dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
654 inode, ceph_vinop(inode), mds,
655 cap == ci->i_auth_cap ? "auth " : "", cap);
656 spin_unlock(&inode->i_lock);
657 return mds;
658
659random:
660 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
661 dout("choose_mds chose random mds%d\n", mds);
662 return mds;
663}
664
665
666/*
667 * session messages
668 */
669static struct ceph_msg *create_session_msg(u32 op, u64 seq)
670{
671 struct ceph_msg *msg;
672 struct ceph_mds_session_head *h;
673
34d23762 674 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS);
a79832f2 675 if (!msg) {
2f2dc053 676 pr_err("create_session_msg ENOMEM creating msg\n");
a79832f2 677 return NULL;
2f2dc053
SW
678 }
679 h = msg->front.iov_base;
680 h->op = cpu_to_le32(op);
681 h->seq = cpu_to_le64(seq);
682 return msg;
683}
684
685/*
686 * send session open request.
687 *
688 * called under mdsc->mutex
689 */
690static int __open_session(struct ceph_mds_client *mdsc,
691 struct ceph_mds_session *session)
692{
693 struct ceph_msg *msg;
694 int mstate;
695 int mds = session->s_mds;
2f2dc053
SW
696
697 /* wait for mds to go active? */
698 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
699 dout("open_session to mds%d (%s)\n", mds,
700 ceph_mds_state_name(mstate));
701 session->s_state = CEPH_MDS_SESSION_OPENING;
702 session->s_renew_requested = jiffies;
703
704 /* send connect message */
705 msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
a79832f2
SW
706 if (!msg)
707 return -ENOMEM;
2f2dc053 708 ceph_con_send(&session->s_con, msg);
2f2dc053
SW
709 return 0;
710}
711
ed0552a1
SW
712/*
713 * open sessions for any export targets for the given mds
714 *
715 * called under mdsc->mutex
716 */
717static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
718 struct ceph_mds_session *session)
719{
720 struct ceph_mds_info *mi;
721 struct ceph_mds_session *ts;
722 int i, mds = session->s_mds;
723 int target;
724
725 if (mds >= mdsc->mdsmap->m_max_mds)
726 return;
727 mi = &mdsc->mdsmap->m_info[mds];
728 dout("open_export_target_sessions for mds%d (%d targets)\n",
729 session->s_mds, mi->num_export_targets);
730
731 for (i = 0; i < mi->num_export_targets; i++) {
732 target = mi->export_targets[i];
733 ts = __ceph_lookup_mds_session(mdsc, target);
734 if (!ts) {
735 ts = register_session(mdsc, target);
736 if (IS_ERR(ts))
737 return;
738 }
739 if (session->s_state == CEPH_MDS_SESSION_NEW ||
740 session->s_state == CEPH_MDS_SESSION_CLOSING)
741 __open_session(mdsc, session);
742 else
743 dout(" mds%d target mds%d %p is %s\n", session->s_mds,
744 i, ts, session_state_name(ts->s_state));
745 ceph_put_mds_session(ts);
746 }
747}
748
154f42c2
SW
749void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
750 struct ceph_mds_session *session)
751{
752 mutex_lock(&mdsc->mutex);
753 __open_export_target_sessions(mdsc, session);
754 mutex_unlock(&mdsc->mutex);
755}
756
2f2dc053
SW
757/*
758 * session caps
759 */
760
761/*
762 * Free preallocated cap messages assigned to this session
763 */
764static void cleanup_cap_releases(struct ceph_mds_session *session)
765{
766 struct ceph_msg *msg;
767
768 spin_lock(&session->s_cap_lock);
769 while (!list_empty(&session->s_cap_releases)) {
770 msg = list_first_entry(&session->s_cap_releases,
771 struct ceph_msg, list_head);
772 list_del_init(&msg->list_head);
773 ceph_msg_put(msg);
774 }
775 while (!list_empty(&session->s_cap_releases_done)) {
776 msg = list_first_entry(&session->s_cap_releases_done,
777 struct ceph_msg, list_head);
778 list_del_init(&msg->list_head);
779 ceph_msg_put(msg);
780 }
781 spin_unlock(&session->s_cap_lock);
782}
783
784/*
f818a736
SW
785 * Helper to safely iterate over all caps associated with a session, with
786 * special care taken to handle a racing __ceph_remove_cap().
2f2dc053 787 *
f818a736 788 * Caller must hold session s_mutex.
2f2dc053
SW
789 */
790static int iterate_session_caps(struct ceph_mds_session *session,
791 int (*cb)(struct inode *, struct ceph_cap *,
792 void *), void *arg)
793{
7c1332b8
SW
794 struct list_head *p;
795 struct ceph_cap *cap;
796 struct inode *inode, *last_inode = NULL;
797 struct ceph_cap *old_cap = NULL;
2f2dc053
SW
798 int ret;
799
800 dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
801 spin_lock(&session->s_cap_lock);
7c1332b8
SW
802 p = session->s_caps.next;
803 while (p != &session->s_caps) {
804 cap = list_entry(p, struct ceph_cap, session_caps);
2f2dc053 805 inode = igrab(&cap->ci->vfs_inode);
7c1332b8
SW
806 if (!inode) {
807 p = p->next;
2f2dc053 808 continue;
7c1332b8
SW
809 }
810 session->s_cap_iterator = cap;
2f2dc053 811 spin_unlock(&session->s_cap_lock);
7c1332b8
SW
812
813 if (last_inode) {
814 iput(last_inode);
815 last_inode = NULL;
816 }
817 if (old_cap) {
37151668 818 ceph_put_cap(session->s_mdsc, old_cap);
7c1332b8
SW
819 old_cap = NULL;
820 }
821
2f2dc053 822 ret = cb(inode, cap, arg);
7c1332b8
SW
823 last_inode = inode;
824
2f2dc053 825 spin_lock(&session->s_cap_lock);
7c1332b8
SW
826 p = p->next;
827 if (cap->ci == NULL) {
828 dout("iterate_session_caps finishing cap %p removal\n",
829 cap);
830 BUG_ON(cap->session != session);
831 list_del_init(&cap->session_caps);
832 session->s_nr_caps--;
833 cap->session = NULL;
834 old_cap = cap; /* put_cap it w/o locks held */
835 }
5dacf091
SW
836 if (ret < 0)
837 goto out;
2f2dc053 838 }
5dacf091
SW
839 ret = 0;
840out:
7c1332b8 841 session->s_cap_iterator = NULL;
2f2dc053 842 spin_unlock(&session->s_cap_lock);
7c1332b8
SW
843
844 if (last_inode)
845 iput(last_inode);
846 if (old_cap)
37151668 847 ceph_put_cap(session->s_mdsc, old_cap);
7c1332b8 848
5dacf091 849 return ret;
2f2dc053
SW
850}
851
852static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
6c99f254 853 void *arg)
2f2dc053
SW
854{
855 struct ceph_inode_info *ci = ceph_inode(inode);
6c99f254
SW
856 int drop = 0;
857
2f2dc053
SW
858 dout("removing cap %p, ci is %p, inode is %p\n",
859 cap, ci, &ci->vfs_inode);
6c99f254
SW
860 spin_lock(&inode->i_lock);
861 __ceph_remove_cap(cap);
862 if (!__ceph_is_any_real_caps(ci)) {
863 struct ceph_mds_client *mdsc =
864 &ceph_sb_to_client(inode->i_sb)->mdsc;
865
866 spin_lock(&mdsc->cap_dirty_lock);
867 if (!list_empty(&ci->i_dirty_item)) {
868 pr_info(" dropping dirty %s state for %p %lld\n",
869 ceph_cap_string(ci->i_dirty_caps),
870 inode, ceph_ino(inode));
871 ci->i_dirty_caps = 0;
872 list_del_init(&ci->i_dirty_item);
873 drop = 1;
874 }
875 if (!list_empty(&ci->i_flushing_item)) {
876 pr_info(" dropping dirty+flushing %s state for %p %lld\n",
877 ceph_cap_string(ci->i_flushing_caps),
878 inode, ceph_ino(inode));
879 ci->i_flushing_caps = 0;
880 list_del_init(&ci->i_flushing_item);
881 mdsc->num_cap_flushing--;
882 drop = 1;
883 }
884 if (drop && ci->i_wrbuffer_ref) {
885 pr_info(" dropping dirty data for %p %lld\n",
886 inode, ceph_ino(inode));
887 ci->i_wrbuffer_ref = 0;
888 ci->i_wrbuffer_ref_head = 0;
889 drop++;
890 }
891 spin_unlock(&mdsc->cap_dirty_lock);
892 }
893 spin_unlock(&inode->i_lock);
894 while (drop--)
895 iput(inode);
2f2dc053
SW
896 return 0;
897}
898
899/*
900 * caller must hold session s_mutex
901 */
902static void remove_session_caps(struct ceph_mds_session *session)
903{
904 dout("remove_session_caps on %p\n", session);
905 iterate_session_caps(session, remove_session_caps_cb, NULL);
906 BUG_ON(session->s_nr_caps > 0);
6c99f254 907 BUG_ON(!list_empty(&session->s_cap_flushing));
2f2dc053
SW
908 cleanup_cap_releases(session);
909}
910
911/*
912 * wake up any threads waiting on this session's caps. if the cap is
913 * old (didn't get renewed on the client reconnect), remove it now.
914 *
915 * caller must hold s_mutex.
916 */
917static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
918 void *arg)
919{
0dc2570f
SW
920 struct ceph_inode_info *ci = ceph_inode(inode);
921
03066f23 922 wake_up_all(&ci->i_cap_wq);
0dc2570f
SW
923 if (arg) {
924 spin_lock(&inode->i_lock);
925 ci->i_wanted_max_size = 0;
926 ci->i_requested_max_size = 0;
927 spin_unlock(&inode->i_lock);
928 }
2f2dc053
SW
929 return 0;
930}
931
0dc2570f
SW
932static void wake_up_session_caps(struct ceph_mds_session *session,
933 int reconnect)
2f2dc053
SW
934{
935 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
0dc2570f
SW
936 iterate_session_caps(session, wake_up_session_cb,
937 (void *)(unsigned long)reconnect);
2f2dc053
SW
938}
939
940/*
941 * Send periodic message to MDS renewing all currently held caps. The
942 * ack will reset the expiration for all caps from this session.
943 *
944 * caller holds s_mutex
945 */
946static int send_renew_caps(struct ceph_mds_client *mdsc,
947 struct ceph_mds_session *session)
948{
949 struct ceph_msg *msg;
950 int state;
951
952 if (time_after_eq(jiffies, session->s_cap_ttl) &&
953 time_after_eq(session->s_cap_ttl, session->s_renew_requested))
954 pr_info("mds%d caps stale\n", session->s_mds);
e4cb4cb8 955 session->s_renew_requested = jiffies;
2f2dc053
SW
956
957 /* do not try to renew caps until a recovering mds has reconnected
958 * with its clients. */
959 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
960 if (state < CEPH_MDS_STATE_RECONNECT) {
961 dout("send_renew_caps ignoring mds%d (%s)\n",
962 session->s_mds, ceph_mds_state_name(state));
963 return 0;
964 }
965
966 dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
967 ceph_mds_state_name(state));
2f2dc053
SW
968 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
969 ++session->s_renew_seq);
a79832f2
SW
970 if (!msg)
971 return -ENOMEM;
2f2dc053
SW
972 ceph_con_send(&session->s_con, msg);
973 return 0;
974}
975
976/*
977 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
0dc2570f
SW
978 *
979 * Called under session->s_mutex
2f2dc053
SW
980 */
981static void renewed_caps(struct ceph_mds_client *mdsc,
982 struct ceph_mds_session *session, int is_renew)
983{
984 int was_stale;
985 int wake = 0;
986
987 spin_lock(&session->s_cap_lock);
988 was_stale = is_renew && (session->s_cap_ttl == 0 ||
989 time_after_eq(jiffies, session->s_cap_ttl));
990
991 session->s_cap_ttl = session->s_renew_requested +
992 mdsc->mdsmap->m_session_timeout*HZ;
993
994 if (was_stale) {
995 if (time_before(jiffies, session->s_cap_ttl)) {
996 pr_info("mds%d caps renewed\n", session->s_mds);
997 wake = 1;
998 } else {
999 pr_info("mds%d caps still stale\n", session->s_mds);
1000 }
1001 }
1002 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1003 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1004 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1005 spin_unlock(&session->s_cap_lock);
1006
1007 if (wake)
0dc2570f 1008 wake_up_session_caps(session, 0);
2f2dc053
SW
1009}
1010
1011/*
1012 * send a session close request
1013 */
1014static int request_close_session(struct ceph_mds_client *mdsc,
1015 struct ceph_mds_session *session)
1016{
1017 struct ceph_msg *msg;
2f2dc053
SW
1018
1019 dout("request_close_session mds%d state %s seq %lld\n",
1020 session->s_mds, session_state_name(session->s_state),
1021 session->s_seq);
1022 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
a79832f2
SW
1023 if (!msg)
1024 return -ENOMEM;
1025 ceph_con_send(&session->s_con, msg);
1026 return 0;
2f2dc053
SW
1027}
1028
1029/*
1030 * Called with s_mutex held.
1031 */
1032static int __close_session(struct ceph_mds_client *mdsc,
1033 struct ceph_mds_session *session)
1034{
1035 if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1036 return 0;
1037 session->s_state = CEPH_MDS_SESSION_CLOSING;
1038 return request_close_session(mdsc, session);
1039}
1040
1041/*
1042 * Trim old(er) caps.
1043 *
1044 * Because we can't cache an inode without one or more caps, we do
1045 * this indirectly: if a cap is unused, we prune its aliases, at which
1046 * point the inode will hopefully get dropped to.
1047 *
1048 * Yes, this is a bit sloppy. Our only real goal here is to respond to
1049 * memory pressure from the MDS, though, so it needn't be perfect.
1050 */
1051static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1052{
1053 struct ceph_mds_session *session = arg;
1054 struct ceph_inode_info *ci = ceph_inode(inode);
1055 int used, oissued, mine;
1056
1057 if (session->s_trim_caps <= 0)
1058 return -1;
1059
1060 spin_lock(&inode->i_lock);
1061 mine = cap->issued | cap->implemented;
1062 used = __ceph_caps_used(ci);
1063 oissued = __ceph_caps_issued_other(ci, cap);
1064
1065 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
1066 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1067 ceph_cap_string(used));
1068 if (ci->i_dirty_caps)
1069 goto out; /* dirty caps */
1070 if ((used & ~oissued) & mine)
1071 goto out; /* we need these caps */
1072
1073 session->s_trim_caps--;
1074 if (oissued) {
1075 /* we aren't the only cap.. just remove us */
7c1332b8 1076 __ceph_remove_cap(cap);
2f2dc053
SW
1077 } else {
1078 /* try to drop referring dentries */
1079 spin_unlock(&inode->i_lock);
1080 d_prune_aliases(inode);
1081 dout("trim_caps_cb %p cap %p pruned, count now %d\n",
1082 inode, cap, atomic_read(&inode->i_count));
1083 return 0;
1084 }
1085
1086out:
1087 spin_unlock(&inode->i_lock);
1088 return 0;
1089}
1090
1091/*
1092 * Trim session cap count down to some max number.
1093 */
1094static int trim_caps(struct ceph_mds_client *mdsc,
1095 struct ceph_mds_session *session,
1096 int max_caps)
1097{
1098 int trim_caps = session->s_nr_caps - max_caps;
1099
1100 dout("trim_caps mds%d start: %d / %d, trim %d\n",
1101 session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1102 if (trim_caps > 0) {
1103 session->s_trim_caps = trim_caps;
1104 iterate_session_caps(session, trim_caps_cb, session);
1105 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1106 session->s_mds, session->s_nr_caps, max_caps,
1107 trim_caps - session->s_trim_caps);
5dacf091 1108 session->s_trim_caps = 0;
2f2dc053
SW
1109 }
1110 return 0;
1111}
1112
1113/*
1114 * Allocate cap_release messages. If there is a partially full message
1115 * in the queue, try to allocate enough to cover it's remainder, so that
1116 * we can send it immediately.
1117 *
1118 * Called under s_mutex.
1119 */
2b2300d6 1120int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
ee6b272b 1121 struct ceph_mds_session *session)
2f2dc053 1122{
38e8883e 1123 struct ceph_msg *msg, *partial = NULL;
2f2dc053
SW
1124 struct ceph_mds_cap_release *head;
1125 int err = -ENOMEM;
ee6b272b 1126 int extra = mdsc->client->mount_args->cap_release_safety;
38e8883e 1127 int num;
2f2dc053 1128
38e8883e
SW
1129 dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
1130 extra);
2f2dc053
SW
1131
1132 spin_lock(&session->s_cap_lock);
1133
1134 if (!list_empty(&session->s_cap_releases)) {
1135 msg = list_first_entry(&session->s_cap_releases,
1136 struct ceph_msg,
1137 list_head);
1138 head = msg->front.iov_base;
38e8883e
SW
1139 num = le32_to_cpu(head->num);
1140 if (num) {
1141 dout(" partial %p with (%d/%d)\n", msg, num,
1142 (int)CEPH_CAPS_PER_RELEASE);
1143 extra += CEPH_CAPS_PER_RELEASE - num;
1144 partial = msg;
1145 }
2f2dc053 1146 }
2f2dc053
SW
1147 while (session->s_num_cap_releases < session->s_nr_caps + extra) {
1148 spin_unlock(&session->s_cap_lock);
34d23762
YS
1149 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
1150 GFP_NOFS);
2f2dc053
SW
1151 if (!msg)
1152 goto out_unlocked;
1153 dout("add_cap_releases %p msg %p now %d\n", session, msg,
1154 (int)msg->front.iov_len);
1155 head = msg->front.iov_base;
1156 head->num = cpu_to_le32(0);
1157 msg->front.iov_len = sizeof(*head);
1158 spin_lock(&session->s_cap_lock);
1159 list_add(&msg->list_head, &session->s_cap_releases);
1160 session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
1161 }
1162
38e8883e
SW
1163 if (partial) {
1164 head = partial->front.iov_base;
1165 num = le32_to_cpu(head->num);
1166 dout(" queueing partial %p with %d/%d\n", partial, num,
1167 (int)CEPH_CAPS_PER_RELEASE);
1168 list_move_tail(&partial->list_head,
1169 &session->s_cap_releases_done);
1170 session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
2f2dc053
SW
1171 }
1172 err = 0;
1173 spin_unlock(&session->s_cap_lock);
1174out_unlocked:
1175 return err;
1176}
1177
1178/*
1179 * flush all dirty inode data to disk.
1180 *
1181 * returns true if we've flushed through want_flush_seq
1182 */
1183static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1184{
1185 int mds, ret = 1;
1186
1187 dout("check_cap_flush want %lld\n", want_flush_seq);
1188 mutex_lock(&mdsc->mutex);
1189 for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
1190 struct ceph_mds_session *session = mdsc->sessions[mds];
1191
1192 if (!session)
1193 continue;
1194 get_session(session);
1195 mutex_unlock(&mdsc->mutex);
1196
1197 mutex_lock(&session->s_mutex);
1198 if (!list_empty(&session->s_cap_flushing)) {
1199 struct ceph_inode_info *ci =
1200 list_entry(session->s_cap_flushing.next,
1201 struct ceph_inode_info,
1202 i_flushing_item);
1203 struct inode *inode = &ci->vfs_inode;
1204
1205 spin_lock(&inode->i_lock);
1206 if (ci->i_cap_flush_seq <= want_flush_seq) {
1207 dout("check_cap_flush still flushing %p "
1208 "seq %lld <= %lld to mds%d\n", inode,
1209 ci->i_cap_flush_seq, want_flush_seq,
1210 session->s_mds);
1211 ret = 0;
1212 }
1213 spin_unlock(&inode->i_lock);
1214 }
1215 mutex_unlock(&session->s_mutex);
1216 ceph_put_mds_session(session);
1217
1218 if (!ret)
1219 return ret;
1220 mutex_lock(&mdsc->mutex);
1221 }
1222
1223 mutex_unlock(&mdsc->mutex);
1224 dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1225 return ret;
1226}
1227
1228/*
1229 * called under s_mutex
1230 */
3d7ded4d
SW
1231void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1232 struct ceph_mds_session *session)
2f2dc053
SW
1233{
1234 struct ceph_msg *msg;
1235
1236 dout("send_cap_releases mds%d\n", session->s_mds);
0f8605f2
SW
1237 spin_lock(&session->s_cap_lock);
1238 while (!list_empty(&session->s_cap_releases_done)) {
2f2dc053
SW
1239 msg = list_first_entry(&session->s_cap_releases_done,
1240 struct ceph_msg, list_head);
1241 list_del_init(&msg->list_head);
1242 spin_unlock(&session->s_cap_lock);
1243 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1244 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1245 ceph_con_send(&session->s_con, msg);
0f8605f2 1246 spin_lock(&session->s_cap_lock);
2f2dc053
SW
1247 }
1248 spin_unlock(&session->s_cap_lock);
1249}
1250
e01a5946
SW
1251static void discard_cap_releases(struct ceph_mds_client *mdsc,
1252 struct ceph_mds_session *session)
1253{
1254 struct ceph_msg *msg;
1255 struct ceph_mds_cap_release *head;
1256 unsigned num;
1257
1258 dout("discard_cap_releases mds%d\n", session->s_mds);
1259 spin_lock(&session->s_cap_lock);
1260
1261 /* zero out the in-progress message */
1262 msg = list_first_entry(&session->s_cap_releases,
1263 struct ceph_msg, list_head);
1264 head = msg->front.iov_base;
1265 num = le32_to_cpu(head->num);
1266 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
1267 head->num = cpu_to_le32(0);
1268 session->s_num_cap_releases += num;
1269
1270 /* requeue completed messages */
1271 while (!list_empty(&session->s_cap_releases_done)) {
1272 msg = list_first_entry(&session->s_cap_releases_done,
1273 struct ceph_msg, list_head);
1274 list_del_init(&msg->list_head);
1275
1276 head = msg->front.iov_base;
1277 num = le32_to_cpu(head->num);
1278 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
1279 num);
1280 session->s_num_cap_releases += num;
1281 head->num = cpu_to_le32(0);
1282 msg->front.iov_len = sizeof(*head);
1283 list_add(&msg->list_head, &session->s_cap_releases);
1284 }
1285
1286 spin_unlock(&session->s_cap_lock);
1287}
1288
2f2dc053
SW
1289/*
1290 * requests
1291 */
1292
1293/*
1294 * Create an mds request.
1295 */
1296struct ceph_mds_request *
1297ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1298{
1299 struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1300
1301 if (!req)
1302 return ERR_PTR(-ENOMEM);
1303
b4556396 1304 mutex_init(&req->r_fill_mutex);
37151668 1305 req->r_mdsc = mdsc;
2f2dc053
SW
1306 req->r_started = jiffies;
1307 req->r_resend_mds = -1;
1308 INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1309 req->r_fmode = -1;
153c8e6b 1310 kref_init(&req->r_kref);
2f2dc053
SW
1311 INIT_LIST_HEAD(&req->r_wait);
1312 init_completion(&req->r_completion);
1313 init_completion(&req->r_safe_completion);
1314 INIT_LIST_HEAD(&req->r_unsafe_item);
1315
1316 req->r_op = op;
1317 req->r_direct_mode = mode;
1318 return req;
1319}
1320
1321/*
44ca18f2 1322 * return oldest (lowest) request, tid in request tree, 0 if none.
2f2dc053
SW
1323 *
1324 * called under mdsc->mutex.
1325 */
44ca18f2
SW
1326static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1327{
1328 if (RB_EMPTY_ROOT(&mdsc->request_tree))
1329 return NULL;
1330 return rb_entry(rb_first(&mdsc->request_tree),
1331 struct ceph_mds_request, r_node);
1332}
1333
2f2dc053
SW
1334static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1335{
44ca18f2
SW
1336 struct ceph_mds_request *req = __get_oldest_req(mdsc);
1337
1338 if (req)
1339 return req->r_tid;
1340 return 0;
2f2dc053
SW
1341}
1342
1343/*
1344 * Build a dentry's path. Allocate on heap; caller must kfree. Based
1345 * on build_path_from_dentry in fs/cifs/dir.c.
1346 *
1347 * If @stop_on_nosnap, generate path relative to the first non-snapped
1348 * inode.
1349 *
1350 * Encode hidden .snap dirs as a double /, i.e.
1351 * foo/.snap/bar -> foo//bar
1352 */
1353char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1354 int stop_on_nosnap)
1355{
1356 struct dentry *temp;
1357 char *path;
1358 int len, pos;
1359
1360 if (dentry == NULL)
1361 return ERR_PTR(-EINVAL);
1362
1363retry:
1364 len = 0;
1365 for (temp = dentry; !IS_ROOT(temp);) {
1366 struct inode *inode = temp->d_inode;
1367 if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1368 len++; /* slash only */
1369 else if (stop_on_nosnap && inode &&
1370 ceph_snap(inode) == CEPH_NOSNAP)
1371 break;
1372 else
1373 len += 1 + temp->d_name.len;
1374 temp = temp->d_parent;
1375 if (temp == NULL) {
6c99f254 1376 pr_err("build_path corrupt dentry %p\n", dentry);
2f2dc053
SW
1377 return ERR_PTR(-EINVAL);
1378 }
1379 }
1380 if (len)
1381 len--; /* no leading '/' */
1382
1383 path = kmalloc(len+1, GFP_NOFS);
1384 if (path == NULL)
1385 return ERR_PTR(-ENOMEM);
1386 pos = len;
1387 path[pos] = 0; /* trailing null */
1388 for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1389 struct inode *inode = temp->d_inode;
1390
1391 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
104648ad 1392 dout("build_path path+%d: %p SNAPDIR\n",
2f2dc053
SW
1393 pos, temp);
1394 } else if (stop_on_nosnap && inode &&
1395 ceph_snap(inode) == CEPH_NOSNAP) {
1396 break;
1397 } else {
1398 pos -= temp->d_name.len;
1399 if (pos < 0)
1400 break;
1401 strncpy(path + pos, temp->d_name.name,
1402 temp->d_name.len);
2f2dc053
SW
1403 }
1404 if (pos)
1405 path[--pos] = '/';
1406 temp = temp->d_parent;
1407 if (temp == NULL) {
104648ad 1408 pr_err("build_path corrupt dentry\n");
2f2dc053
SW
1409 kfree(path);
1410 return ERR_PTR(-EINVAL);
1411 }
1412 }
1413 if (pos != 0) {
104648ad 1414 pr_err("build_path did not end path lookup where "
2f2dc053
SW
1415 "expected, namelen is %d, pos is %d\n", len, pos);
1416 /* presumably this is only possible if racing with a
1417 rename of one of the parent directories (we can not
1418 lock the dentries above us to prevent this, but
1419 retrying should be harmless) */
1420 kfree(path);
1421 goto retry;
1422 }
1423
1424 *base = ceph_ino(temp->d_inode);
1425 *plen = len;
104648ad 1426 dout("build_path on %p %d built %llx '%.*s'\n",
2f2dc053
SW
1427 dentry, atomic_read(&dentry->d_count), *base, len, path);
1428 return path;
1429}
1430
1431static int build_dentry_path(struct dentry *dentry,
1432 const char **ppath, int *ppathlen, u64 *pino,
1433 int *pfreepath)
1434{
1435 char *path;
1436
1437 if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1438 *pino = ceph_ino(dentry->d_parent->d_inode);
1439 *ppath = dentry->d_name.name;
1440 *ppathlen = dentry->d_name.len;
1441 return 0;
1442 }
1443 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1444 if (IS_ERR(path))
1445 return PTR_ERR(path);
1446 *ppath = path;
1447 *pfreepath = 1;
1448 return 0;
1449}
1450
1451static int build_inode_path(struct inode *inode,
1452 const char **ppath, int *ppathlen, u64 *pino,
1453 int *pfreepath)
1454{
1455 struct dentry *dentry;
1456 char *path;
1457
1458 if (ceph_snap(inode) == CEPH_NOSNAP) {
1459 *pino = ceph_ino(inode);
1460 *ppathlen = 0;
1461 return 0;
1462 }
1463 dentry = d_find_alias(inode);
1464 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1465 dput(dentry);
1466 if (IS_ERR(path))
1467 return PTR_ERR(path);
1468 *ppath = path;
1469 *pfreepath = 1;
1470 return 0;
1471}
1472
1473/*
1474 * request arguments may be specified via an inode *, a dentry *, or
1475 * an explicit ino+path.
1476 */
1477static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1478 const char *rpath, u64 rino,
1479 const char **ppath, int *pathlen,
1480 u64 *ino, int *freepath)
1481{
1482 int r = 0;
1483
1484 if (rinode) {
1485 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1486 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1487 ceph_snap(rinode));
1488 } else if (rdentry) {
1489 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1490 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1491 *ppath);
1492 } else if (rpath) {
1493 *ino = rino;
1494 *ppath = rpath;
1495 *pathlen = strlen(rpath);
1496 dout(" path %.*s\n", *pathlen, rpath);
1497 }
1498
1499 return r;
1500}
1501
1502/*
1503 * called under mdsc->mutex
1504 */
1505static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1506 struct ceph_mds_request *req,
1507 int mds)
1508{
1509 struct ceph_msg *msg;
1510 struct ceph_mds_request_head *head;
1511 const char *path1 = NULL;
1512 const char *path2 = NULL;
1513 u64 ino1 = 0, ino2 = 0;
1514 int pathlen1 = 0, pathlen2 = 0;
1515 int freepath1 = 0, freepath2 = 0;
1516 int len;
1517 u16 releases;
1518 void *p, *end;
1519 int ret;
1520
1521 ret = set_request_path_attr(req->r_inode, req->r_dentry,
1522 req->r_path1, req->r_ino1.ino,
1523 &path1, &pathlen1, &ino1, &freepath1);
1524 if (ret < 0) {
1525 msg = ERR_PTR(ret);
1526 goto out;
1527 }
1528
1529 ret = set_request_path_attr(NULL, req->r_old_dentry,
1530 req->r_path2, req->r_ino2.ino,
1531 &path2, &pathlen2, &ino2, &freepath2);
1532 if (ret < 0) {
1533 msg = ERR_PTR(ret);
1534 goto out_free1;
1535 }
1536
1537 len = sizeof(*head) +
ac8839d7 1538 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
2f2dc053
SW
1539
1540 /* calculate (max) length for cap releases */
1541 len += sizeof(struct ceph_mds_request_release) *
1542 (!!req->r_inode_drop + !!req->r_dentry_drop +
1543 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1544 if (req->r_dentry_drop)
1545 len += req->r_dentry->d_name.len;
1546 if (req->r_old_dentry_drop)
1547 len += req->r_old_dentry->d_name.len;
1548
34d23762 1549 msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS);
a79832f2
SW
1550 if (!msg) {
1551 msg = ERR_PTR(-ENOMEM);
2f2dc053 1552 goto out_free2;
a79832f2 1553 }
2f2dc053 1554
6df058c0
SW
1555 msg->hdr.tid = cpu_to_le64(req->r_tid);
1556
2f2dc053
SW
1557 head = msg->front.iov_base;
1558 p = msg->front.iov_base + sizeof(*head);
1559 end = msg->front.iov_base + msg->front.iov_len;
1560
1561 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1562 head->op = cpu_to_le32(req->r_op);
1563 head->caller_uid = cpu_to_le32(current_fsuid());
1564 head->caller_gid = cpu_to_le32(current_fsgid());
1565 head->args = req->r_args;
1566
1567 ceph_encode_filepath(&p, end, ino1, path1);
1568 ceph_encode_filepath(&p, end, ino2, path2);
1569
e979cf50
SW
1570 /* make note of release offset, in case we need to replay */
1571 req->r_request_release_offset = p - msg->front.iov_base;
1572
2f2dc053
SW
1573 /* cap releases */
1574 releases = 0;
1575 if (req->r_inode_drop)
1576 releases += ceph_encode_inode_release(&p,
1577 req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1578 mds, req->r_inode_drop, req->r_inode_unless, 0);
1579 if (req->r_dentry_drop)
1580 releases += ceph_encode_dentry_release(&p, req->r_dentry,
1581 mds, req->r_dentry_drop, req->r_dentry_unless);
1582 if (req->r_old_dentry_drop)
1583 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1584 mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1585 if (req->r_old_inode_drop)
1586 releases += ceph_encode_inode_release(&p,
1587 req->r_old_dentry->d_inode,
1588 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1589 head->num_releases = cpu_to_le16(releases);
1590
1591 BUG_ON(p > end);
1592 msg->front.iov_len = p - msg->front.iov_base;
1593 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1594
1595 msg->pages = req->r_pages;
1596 msg->nr_pages = req->r_num_pages;
1597 msg->hdr.data_len = cpu_to_le32(req->r_data_len);
1598 msg->hdr.data_off = cpu_to_le16(0);
1599
1600out_free2:
1601 if (freepath2)
1602 kfree((char *)path2);
1603out_free1:
1604 if (freepath1)
1605 kfree((char *)path1);
1606out:
1607 return msg;
1608}
1609
1610/*
1611 * called under mdsc->mutex if error, under no mutex if
1612 * success.
1613 */
1614static void complete_request(struct ceph_mds_client *mdsc,
1615 struct ceph_mds_request *req)
1616{
1617 if (req->r_callback)
1618 req->r_callback(mdsc, req);
1619 else
03066f23 1620 complete_all(&req->r_completion);
2f2dc053
SW
1621}
1622
1623/*
1624 * called under mdsc->mutex
1625 */
1626static int __prepare_send_request(struct ceph_mds_client *mdsc,
1627 struct ceph_mds_request *req,
1628 int mds)
1629{
1630 struct ceph_mds_request_head *rhead;
1631 struct ceph_msg *msg;
1632 int flags = 0;
1633
1634 req->r_mds = mds;
1635 req->r_attempts++;
e55b71f8
GF
1636 if (req->r_inode) {
1637 struct ceph_cap *cap =
1638 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
1639
1640 if (cap)
1641 req->r_sent_on_mseq = cap->mseq;
1642 else
1643 req->r_sent_on_mseq = -1;
1644 }
2f2dc053
SW
1645 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1646 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1647
01a92f17
SW
1648 if (req->r_got_unsafe) {
1649 /*
1650 * Replay. Do not regenerate message (and rebuild
1651 * paths, etc.); just use the original message.
1652 * Rebuilding paths will break for renames because
1653 * d_move mangles the src name.
1654 */
1655 msg = req->r_request;
1656 rhead = msg->front.iov_base;
1657
1658 flags = le32_to_cpu(rhead->flags);
1659 flags |= CEPH_MDS_FLAG_REPLAY;
1660 rhead->flags = cpu_to_le32(flags);
1661
1662 if (req->r_target_inode)
1663 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
1664
1665 rhead->num_retry = req->r_attempts - 1;
e979cf50
SW
1666
1667 /* remove cap/dentry releases from message */
1668 rhead->num_releases = 0;
1669 msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset);
1670 msg->front.iov_len = req->r_request_release_offset;
01a92f17
SW
1671 return 0;
1672 }
1673
2f2dc053
SW
1674 if (req->r_request) {
1675 ceph_msg_put(req->r_request);
1676 req->r_request = NULL;
1677 }
1678 msg = create_request_message(mdsc, req, mds);
1679 if (IS_ERR(msg)) {
e1518c7c 1680 req->r_err = PTR_ERR(msg);
2f2dc053 1681 complete_request(mdsc, req);
a79832f2 1682 return PTR_ERR(msg);
2f2dc053
SW
1683 }
1684 req->r_request = msg;
1685
1686 rhead = msg->front.iov_base;
2f2dc053
SW
1687 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
1688 if (req->r_got_unsafe)
1689 flags |= CEPH_MDS_FLAG_REPLAY;
1690 if (req->r_locked_dir)
1691 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
1692 rhead->flags = cpu_to_le32(flags);
1693 rhead->num_fwd = req->r_num_fwd;
1694 rhead->num_retry = req->r_attempts - 1;
01a92f17 1695 rhead->ino = 0;
2f2dc053
SW
1696
1697 dout(" r_locked_dir = %p\n", req->r_locked_dir);
2f2dc053
SW
1698 return 0;
1699}
1700
1701/*
1702 * send request, or put it on the appropriate wait list.
1703 */
1704static int __do_request(struct ceph_mds_client *mdsc,
1705 struct ceph_mds_request *req)
1706{
1707 struct ceph_mds_session *session = NULL;
1708 int mds = -1;
1709 int err = -EAGAIN;
1710
e1518c7c 1711 if (req->r_err || req->r_got_result)
2f2dc053
SW
1712 goto out;
1713
1714 if (req->r_timeout &&
1715 time_after_eq(jiffies, req->r_started + req->r_timeout)) {
1716 dout("do_request timed out\n");
1717 err = -EIO;
1718 goto finish;
1719 }
1720
1721 mds = __choose_mds(mdsc, req);
1722 if (mds < 0 ||
1723 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
1724 dout("do_request no mds or not active, waiting for map\n");
1725 list_add(&req->r_wait, &mdsc->waiting_for_map);
1726 goto out;
1727 }
1728
1729 /* get, open session */
1730 session = __ceph_lookup_mds_session(mdsc, mds);
9c423956 1731 if (!session) {
2f2dc053 1732 session = register_session(mdsc, mds);
9c423956
SW
1733 if (IS_ERR(session)) {
1734 err = PTR_ERR(session);
1735 goto finish;
1736 }
1737 }
2f2dc053
SW
1738 dout("do_request mds%d session %p state %s\n", mds, session,
1739 session_state_name(session->s_state));
1740 if (session->s_state != CEPH_MDS_SESSION_OPEN &&
1741 session->s_state != CEPH_MDS_SESSION_HUNG) {
1742 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1743 session->s_state == CEPH_MDS_SESSION_CLOSING)
1744 __open_session(mdsc, session);
1745 list_add(&req->r_wait, &session->s_waiting);
1746 goto out_session;
1747 }
1748
1749 /* send request */
1750 req->r_session = get_session(session);
1751 req->r_resend_mds = -1; /* forget any previous mds hint */
1752
1753 if (req->r_request_started == 0) /* note request start time */
1754 req->r_request_started = jiffies;
1755
1756 err = __prepare_send_request(mdsc, req, mds);
1757 if (!err) {
1758 ceph_msg_get(req->r_request);
1759 ceph_con_send(&session->s_con, req->r_request);
1760 }
1761
1762out_session:
1763 ceph_put_mds_session(session);
1764out:
1765 return err;
1766
1767finish:
e1518c7c 1768 req->r_err = err;
2f2dc053
SW
1769 complete_request(mdsc, req);
1770 goto out;
1771}
1772
1773/*
1774 * called under mdsc->mutex
1775 */
1776static void __wake_requests(struct ceph_mds_client *mdsc,
1777 struct list_head *head)
1778{
1779 struct ceph_mds_request *req, *nreq;
1780
1781 list_for_each_entry_safe(req, nreq, head, r_wait) {
1782 list_del_init(&req->r_wait);
1783 __do_request(mdsc, req);
1784 }
1785}
1786
1787/*
1788 * Wake up threads with requests pending for @mds, so that they can
29790f26 1789 * resubmit their requests to a possibly different mds.
2f2dc053 1790 */
29790f26 1791static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2f2dc053 1792{
44ca18f2
SW
1793 struct ceph_mds_request *req;
1794 struct rb_node *p;
2f2dc053
SW
1795
1796 dout("kick_requests mds%d\n", mds);
44ca18f2
SW
1797 for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) {
1798 req = rb_entry(p, struct ceph_mds_request, r_node);
1799 if (req->r_got_unsafe)
1800 continue;
1801 if (req->r_session &&
1802 req->r_session->s_mds == mds) {
1803 dout(" kicking tid %llu\n", req->r_tid);
1804 put_request_session(req);
1805 __do_request(mdsc, req);
2f2dc053
SW
1806 }
1807 }
1808}
1809
1810void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
1811 struct ceph_mds_request *req)
1812{
1813 dout("submit_request on %p\n", req);
1814 mutex_lock(&mdsc->mutex);
1815 __register_request(mdsc, req, NULL);
1816 __do_request(mdsc, req);
1817 mutex_unlock(&mdsc->mutex);
1818}
1819
1820/*
1821 * Synchrously perform an mds request. Take care of all of the
1822 * session setup, forwarding, retry details.
1823 */
1824int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1825 struct inode *dir,
1826 struct ceph_mds_request *req)
1827{
1828 int err;
1829
1830 dout("do_request on %p\n", req);
1831
1832 /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
1833 if (req->r_inode)
1834 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1835 if (req->r_locked_dir)
1836 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
1837 if (req->r_old_dentry)
1838 ceph_get_cap_refs(
1839 ceph_inode(req->r_old_dentry->d_parent->d_inode),
1840 CEPH_CAP_PIN);
1841
1842 /* issue */
1843 mutex_lock(&mdsc->mutex);
1844 __register_request(mdsc, req, dir);
1845 __do_request(mdsc, req);
1846
e1518c7c
SW
1847 if (req->r_err) {
1848 err = req->r_err;
1849 __unregister_request(mdsc, req);
1850 dout("do_request early error %d\n", err);
1851 goto out;
2f2dc053
SW
1852 }
1853
e1518c7c
SW
1854 /* wait */
1855 mutex_unlock(&mdsc->mutex);
1856 dout("do_request waiting\n");
1857 if (req->r_timeout) {
aa91647c 1858 err = (long)wait_for_completion_killable_timeout(
e1518c7c
SW
1859 &req->r_completion, req->r_timeout);
1860 if (err == 0)
1861 err = -EIO;
1862 } else {
aa91647c 1863 err = wait_for_completion_killable(&req->r_completion);
e1518c7c
SW
1864 }
1865 dout("do_request waited, got %d\n", err);
1866 mutex_lock(&mdsc->mutex);
5b1daecd 1867
e1518c7c
SW
1868 /* only abort if we didn't race with a real reply */
1869 if (req->r_got_result) {
1870 err = le32_to_cpu(req->r_reply_info.head->result);
1871 } else if (err < 0) {
1872 dout("aborted request %lld with %d\n", req->r_tid, err);
b4556396
SW
1873
1874 /*
1875 * ensure we aren't running concurrently with
1876 * ceph_fill_trace or ceph_readdir_prepopulate, which
1877 * rely on locks (dir mutex) held by our caller.
1878 */
1879 mutex_lock(&req->r_fill_mutex);
e1518c7c
SW
1880 req->r_err = err;
1881 req->r_aborted = true;
b4556396 1882 mutex_unlock(&req->r_fill_mutex);
5b1daecd 1883
e1518c7c 1884 if (req->r_locked_dir &&
167c9e35
SW
1885 (req->r_op & CEPH_MDS_OP_WRITE))
1886 ceph_invalidate_dir_request(req);
2f2dc053 1887 } else {
e1518c7c 1888 err = req->r_err;
2f2dc053 1889 }
2f2dc053 1890
e1518c7c
SW
1891out:
1892 mutex_unlock(&mdsc->mutex);
2f2dc053
SW
1893 dout("do_request %p done, result %d\n", req, err);
1894 return err;
1895}
1896
167c9e35
SW
1897/*
1898 * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS
1899 * namespace request.
1900 */
1901void ceph_invalidate_dir_request(struct ceph_mds_request *req)
1902{
1903 struct inode *inode = req->r_locked_dir;
1904 struct ceph_inode_info *ci = ceph_inode(inode);
1905
1906 dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode);
1907 spin_lock(&inode->i_lock);
1908 ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
1909 ci->i_release_count++;
1910 spin_unlock(&inode->i_lock);
1911
1912 if (req->r_dentry)
1913 ceph_invalidate_dentry_lease(req->r_dentry);
1914 if (req->r_old_dentry)
1915 ceph_invalidate_dentry_lease(req->r_old_dentry);
1916}
1917
2f2dc053
SW
1918/*
1919 * Handle mds reply.
1920 *
1921 * We take the session mutex and parse and process the reply immediately.
1922 * This preserves the logical ordering of replies, capabilities, etc., sent
1923 * by the MDS as they are applied to our local cache.
1924 */
1925static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1926{
1927 struct ceph_mds_client *mdsc = session->s_mdsc;
1928 struct ceph_mds_request *req;
1929 struct ceph_mds_reply_head *head = msg->front.iov_base;
1930 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
1931 u64 tid;
1932 int err, result;
2600d2dd 1933 int mds = session->s_mds;
2f2dc053 1934
2f2dc053
SW
1935 if (msg->front.iov_len < sizeof(*head)) {
1936 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
9ec7cab1 1937 ceph_msg_dump(msg);
2f2dc053
SW
1938 return;
1939 }
1940
1941 /* get request, session */
6df058c0 1942 tid = le64_to_cpu(msg->hdr.tid);
2f2dc053
SW
1943 mutex_lock(&mdsc->mutex);
1944 req = __lookup_request(mdsc, tid);
1945 if (!req) {
1946 dout("handle_reply on unknown tid %llu\n", tid);
1947 mutex_unlock(&mdsc->mutex);
1948 return;
1949 }
1950 dout("handle_reply %p\n", req);
2f2dc053
SW
1951
1952 /* correct session? */
d96d6049 1953 if (req->r_session != session) {
2f2dc053
SW
1954 pr_err("mdsc_handle_reply got %llu on session mds%d"
1955 " not mds%d\n", tid, session->s_mds,
1956 req->r_session ? req->r_session->s_mds : -1);
1957 mutex_unlock(&mdsc->mutex);
1958 goto out;
1959 }
1960
1961 /* dup? */
1962 if ((req->r_got_unsafe && !head->safe) ||
1963 (req->r_got_safe && head->safe)) {
1964 pr_warning("got a dup %s reply on %llu from mds%d\n",
1965 head->safe ? "safe" : "unsafe", tid, mds);
1966 mutex_unlock(&mdsc->mutex);
1967 goto out;
1968 }
85792d0d
SW
1969 if (req->r_got_safe && !head->safe) {
1970 pr_warning("got unsafe after safe on %llu from mds%d\n",
1971 tid, mds);
1972 mutex_unlock(&mdsc->mutex);
1973 goto out;
1974 }
2f2dc053
SW
1975
1976 result = le32_to_cpu(head->result);
1977
1978 /*
e55b71f8
GF
1979 * Handle an ESTALE
1980 * if we're not talking to the authority, send to them
1981 * if the authority has changed while we weren't looking,
1982 * send to new authority
1983 * Otherwise we just have to return an ESTALE
2f2dc053
SW
1984 */
1985 if (result == -ESTALE) {
e55b71f8
GF
1986 dout("got ESTALE on request %llu", req->r_tid);
1987 if (!req->r_inode) ; //do nothing; not an authority problem
1988 else if (req->r_direct_mode != USE_AUTH_MDS) {
1989 dout("not using auth, setting for that now");
1990 req->r_direct_mode = USE_AUTH_MDS;
2f2dc053
SW
1991 __do_request(mdsc, req);
1992 mutex_unlock(&mdsc->mutex);
1993 goto out;
e55b71f8
GF
1994 } else {
1995 struct ceph_inode_info *ci = ceph_inode(req->r_inode);
1996 struct ceph_cap *cap =
1997 ceph_get_cap_for_mds(ci, req->r_mds);;
1998
1999 dout("already using auth");
2000 if ((!cap || cap != ci->i_auth_cap) ||
2001 (cap->mseq != req->r_sent_on_mseq)) {
2002 dout("but cap changed, so resending");
2003 __do_request(mdsc, req);
2004 mutex_unlock(&mdsc->mutex);
2005 goto out;
2006 }
2f2dc053 2007 }
e55b71f8 2008 dout("have to return ESTALE on request %llu", req->r_tid);
2f2dc053
SW
2009 }
2010
e55b71f8 2011
2f2dc053
SW
2012 if (head->safe) {
2013 req->r_got_safe = true;
2014 __unregister_request(mdsc, req);
03066f23 2015 complete_all(&req->r_safe_completion);
2f2dc053
SW
2016
2017 if (req->r_got_unsafe) {
2018 /*
2019 * We already handled the unsafe response, now do the
2020 * cleanup. No need to examine the response; the MDS
2021 * doesn't include any result info in the safe
2022 * response. And even if it did, there is nothing
2023 * useful we could do with a revised return value.
2024 */
2025 dout("got safe reply %llu, mds%d\n", tid, mds);
2026 list_del_init(&req->r_unsafe_item);
2027
2028 /* last unsafe request during umount? */
44ca18f2 2029 if (mdsc->stopping && !__get_oldest_req(mdsc))
03066f23 2030 complete_all(&mdsc->safe_umount_waiters);
2f2dc053
SW
2031 mutex_unlock(&mdsc->mutex);
2032 goto out;
2033 }
e1518c7c 2034 } else {
2f2dc053
SW
2035 req->r_got_unsafe = true;
2036 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2037 }
2038
2039 dout("handle_reply tid %lld result %d\n", tid, result);
2040 rinfo = &req->r_reply_info;
2041 err = parse_reply_info(msg, rinfo);
2042 mutex_unlock(&mdsc->mutex);
2043
2044 mutex_lock(&session->s_mutex);
2045 if (err < 0) {
2046 pr_err("mdsc_handle_reply got corrupt reply mds%d\n", mds);
9ec7cab1 2047 ceph_msg_dump(msg);
2f2dc053
SW
2048 goto out_err;
2049 }
2050
2051 /* snap trace */
2052 if (rinfo->snapblob_len) {
2053 down_write(&mdsc->snap_rwsem);
2054 ceph_update_snap_trace(mdsc, rinfo->snapblob,
2055 rinfo->snapblob + rinfo->snapblob_len,
2056 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
2057 downgrade_write(&mdsc->snap_rwsem);
2058 } else {
2059 down_read(&mdsc->snap_rwsem);
2060 }
2061
2062 /* insert trace into our cache */
b4556396 2063 mutex_lock(&req->r_fill_mutex);
2f2dc053
SW
2064 err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
2065 if (err == 0) {
2066 if (result == 0 && rinfo->dir_nr)
2067 ceph_readdir_prepopulate(req, req->r_session);
37151668 2068 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2f2dc053 2069 }
b4556396 2070 mutex_unlock(&req->r_fill_mutex);
2f2dc053
SW
2071
2072 up_read(&mdsc->snap_rwsem);
2073out_err:
e1518c7c
SW
2074 mutex_lock(&mdsc->mutex);
2075 if (!req->r_aborted) {
2076 if (err) {
2077 req->r_err = err;
2078 } else {
2079 req->r_reply = msg;
2080 ceph_msg_get(msg);
2081 req->r_got_result = true;
2082 }
2f2dc053 2083 } else {
e1518c7c 2084 dout("reply arrived after request %lld was aborted\n", tid);
2f2dc053 2085 }
e1518c7c 2086 mutex_unlock(&mdsc->mutex);
2f2dc053 2087
ee6b272b 2088 ceph_add_cap_releases(mdsc, req->r_session);
2f2dc053
SW
2089 mutex_unlock(&session->s_mutex);
2090
2091 /* kick calling process */
2092 complete_request(mdsc, req);
2093out:
2094 ceph_mdsc_put_request(req);
2095 return;
2096}
2097
2098
2099
2100/*
2101 * handle mds notification that our request has been forwarded.
2102 */
2600d2dd
SW
2103static void handle_forward(struct ceph_mds_client *mdsc,
2104 struct ceph_mds_session *session,
2105 struct ceph_msg *msg)
2f2dc053
SW
2106{
2107 struct ceph_mds_request *req;
a1ea787c 2108 u64 tid = le64_to_cpu(msg->hdr.tid);
2f2dc053
SW
2109 u32 next_mds;
2110 u32 fwd_seq;
2f2dc053
SW
2111 int err = -EINVAL;
2112 void *p = msg->front.iov_base;
2113 void *end = p + msg->front.iov_len;
2f2dc053 2114
a1ea787c 2115 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea
SW
2116 next_mds = ceph_decode_32(&p);
2117 fwd_seq = ceph_decode_32(&p);
2f2dc053
SW
2118
2119 mutex_lock(&mdsc->mutex);
2120 req = __lookup_request(mdsc, tid);
2121 if (!req) {
2a8e5e36 2122 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2f2dc053
SW
2123 goto out; /* dup reply? */
2124 }
2125
2a8e5e36
SW
2126 if (req->r_aborted) {
2127 dout("forward tid %llu aborted, unregistering\n", tid);
2128 __unregister_request(mdsc, req);
2129 } else if (fwd_seq <= req->r_num_fwd) {
2130 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2f2dc053
SW
2131 tid, next_mds, req->r_num_fwd, fwd_seq);
2132 } else {
2133 /* resend. forward race not possible; mds would drop */
2a8e5e36
SW
2134 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2135 BUG_ON(req->r_err);
2136 BUG_ON(req->r_got_result);
2f2dc053
SW
2137 req->r_num_fwd = fwd_seq;
2138 req->r_resend_mds = next_mds;
2139 put_request_session(req);
2140 __do_request(mdsc, req);
2141 }
2142 ceph_mdsc_put_request(req);
2143out:
2144 mutex_unlock(&mdsc->mutex);
2145 return;
2146
2147bad:
2148 pr_err("mdsc_handle_forward decode error err=%d\n", err);
2149}
2150
2151/*
2152 * handle a mds session control message
2153 */
2154static void handle_session(struct ceph_mds_session *session,
2155 struct ceph_msg *msg)
2156{
2157 struct ceph_mds_client *mdsc = session->s_mdsc;
2158 u32 op;
2159 u64 seq;
2600d2dd 2160 int mds = session->s_mds;
2f2dc053
SW
2161 struct ceph_mds_session_head *h = msg->front.iov_base;
2162 int wake = 0;
2163
2f2dc053
SW
2164 /* decode */
2165 if (msg->front.iov_len != sizeof(*h))
2166 goto bad;
2167 op = le32_to_cpu(h->op);
2168 seq = le64_to_cpu(h->seq);
2169
2170 mutex_lock(&mdsc->mutex);
2600d2dd
SW
2171 if (op == CEPH_SESSION_CLOSE)
2172 __unregister_session(mdsc, session);
2f2dc053
SW
2173 /* FIXME: this ttl calculation is generous */
2174 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2175 mutex_unlock(&mdsc->mutex);
2176
2177 mutex_lock(&session->s_mutex);
2178
2179 dout("handle_session mds%d %s %p state %s seq %llu\n",
2180 mds, ceph_session_op_name(op), session,
2181 session_state_name(session->s_state), seq);
2182
2183 if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2184 session->s_state = CEPH_MDS_SESSION_OPEN;
2185 pr_info("mds%d came back\n", session->s_mds);
2186 }
2187
2188 switch (op) {
2189 case CEPH_SESSION_OPEN:
29790f26
SW
2190 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2191 pr_info("mds%d reconnect success\n", session->s_mds);
2f2dc053
SW
2192 session->s_state = CEPH_MDS_SESSION_OPEN;
2193 renewed_caps(mdsc, session, 0);
2194 wake = 1;
2195 if (mdsc->stopping)
2196 __close_session(mdsc, session);
2197 break;
2198
2199 case CEPH_SESSION_RENEWCAPS:
2200 if (session->s_renew_seq == seq)
2201 renewed_caps(mdsc, session, 1);
2202 break;
2203
2204 case CEPH_SESSION_CLOSE:
29790f26
SW
2205 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2206 pr_info("mds%d reconnect denied\n", session->s_mds);
2f2dc053
SW
2207 remove_session_caps(session);
2208 wake = 1; /* for good measure */
03066f23 2209 complete_all(&mdsc->session_close_waiters);
29790f26 2210 kick_requests(mdsc, mds);
2f2dc053
SW
2211 break;
2212
2213 case CEPH_SESSION_STALE:
2214 pr_info("mds%d caps went stale, renewing\n",
2215 session->s_mds);
2216 spin_lock(&session->s_cap_lock);
2217 session->s_cap_gen++;
2218 session->s_cap_ttl = 0;
2219 spin_unlock(&session->s_cap_lock);
2220 send_renew_caps(mdsc, session);
2221 break;
2222
2223 case CEPH_SESSION_RECALL_STATE:
2224 trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2225 break;
2226
2227 default:
2228 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2229 WARN_ON(1);
2230 }
2231
2232 mutex_unlock(&session->s_mutex);
2233 if (wake) {
2234 mutex_lock(&mdsc->mutex);
2235 __wake_requests(mdsc, &session->s_waiting);
2236 mutex_unlock(&mdsc->mutex);
2237 }
2238 return;
2239
2240bad:
2241 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2242 (int)msg->front.iov_len);
9ec7cab1 2243 ceph_msg_dump(msg);
2f2dc053
SW
2244 return;
2245}
2246
2247
2248/*
2249 * called under session->mutex.
2250 */
2251static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2252 struct ceph_mds_session *session)
2253{
2254 struct ceph_mds_request *req, *nreq;
2255 int err;
2256
2257 dout("replay_unsafe_requests mds%d\n", session->s_mds);
2258
2259 mutex_lock(&mdsc->mutex);
2260 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2261 err = __prepare_send_request(mdsc, req, session->s_mds);
2262 if (!err) {
2263 ceph_msg_get(req->r_request);
2264 ceph_con_send(&session->s_con, req->r_request);
2265 }
2266 }
2267 mutex_unlock(&mdsc->mutex);
2268}
2269
2270/*
2271 * Encode information about a cap for a reconnect with the MDS.
2272 */
2f2dc053
SW
2273static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2274 void *arg)
2275{
20cb34ae
SW
2276 union {
2277 struct ceph_mds_cap_reconnect v2;
2278 struct ceph_mds_cap_reconnect_v1 v1;
2279 } rec;
2280 size_t reclen;
2f2dc053 2281 struct ceph_inode_info *ci;
20cb34ae
SW
2282 struct ceph_reconnect_state *recon_state = arg;
2283 struct ceph_pagelist *pagelist = recon_state->pagelist;
2f2dc053
SW
2284 char *path;
2285 int pathlen, err;
2286 u64 pathbase;
2287 struct dentry *dentry;
2288
2289 ci = cap->ci;
2290
2291 dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2292 inode, ceph_vinop(inode), cap, cap->cap_id,
2293 ceph_cap_string(cap->issued));
93cea5be
SW
2294 err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2295 if (err)
2296 return err;
2f2dc053
SW
2297
2298 dentry = d_find_alias(inode);
2299 if (dentry) {
2300 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2301 if (IS_ERR(path)) {
2302 err = PTR_ERR(path);
2303 BUG_ON(err);
2304 }
2305 } else {
2306 path = NULL;
2307 pathlen = 0;
2308 }
93cea5be
SW
2309 err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2310 if (err)
2311 goto out;
2f2dc053 2312
2f2dc053
SW
2313 spin_lock(&inode->i_lock);
2314 cap->seq = 0; /* reset cap seq */
2315 cap->issue_seq = 0; /* and issue_seq */
20cb34ae
SW
2316
2317 if (recon_state->flock) {
2318 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2319 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2320 rec.v2.issued = cpu_to_le32(cap->issued);
2321 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2322 rec.v2.pathbase = cpu_to_le64(pathbase);
2323 rec.v2.flock_len = 0;
2324 reclen = sizeof(rec.v2);
2325 } else {
2326 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2327 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2328 rec.v1.issued = cpu_to_le32(cap->issued);
2329 rec.v1.size = cpu_to_le64(inode->i_size);
2330 ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2331 ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2332 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2333 rec.v1.pathbase = cpu_to_le64(pathbase);
2334 reclen = sizeof(rec.v1);
2335 }
2f2dc053
SW
2336 spin_unlock(&inode->i_lock);
2337
20cb34ae 2338 err = ceph_pagelist_append(pagelist, &rec, reclen);
93cea5be
SW
2339
2340out:
2f2dc053
SW
2341 kfree(path);
2342 dput(dentry);
93cea5be 2343 return err;
2f2dc053
SW
2344}
2345
2346
2347/*
2348 * If an MDS fails and recovers, clients need to reconnect in order to
2349 * reestablish shared state. This includes all caps issued through
2350 * this session _and_ the snap_realm hierarchy. Because it's not
2351 * clear which snap realms the mds cares about, we send everything we
2352 * know about.. that ensures we'll then get any new info the
2353 * recovering MDS might have.
2354 *
2355 * This is a relatively heavyweight operation, but it's rare.
2356 *
2357 * called with mdsc->mutex held.
2358 */
34b6c855
SW
2359static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2360 struct ceph_mds_session *session)
2f2dc053 2361{
2f2dc053 2362 struct ceph_msg *reply;
a105f00c 2363 struct rb_node *p;
34b6c855 2364 int mds = session->s_mds;
9abf82b8 2365 int err = -ENOMEM;
93cea5be 2366 struct ceph_pagelist *pagelist;
20cb34ae 2367 struct ceph_reconnect_state recon_state;
2f2dc053 2368
34b6c855 2369 pr_info("mds%d reconnect start\n", mds);
2f2dc053 2370
93cea5be
SW
2371 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2372 if (!pagelist)
2373 goto fail_nopagelist;
2374 ceph_pagelist_init(pagelist);
2375
34d23762 2376 reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS);
a79832f2 2377 if (!reply)
93cea5be 2378 goto fail_nomsg;
93cea5be 2379
34b6c855
SW
2380 mutex_lock(&session->s_mutex);
2381 session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2382 session->s_seq = 0;
2f2dc053 2383
34b6c855
SW
2384 ceph_con_open(&session->s_con,
2385 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2f2dc053 2386
34b6c855
SW
2387 /* replay unsafe requests */
2388 replay_unsafe_requests(mdsc, session);
2f2dc053
SW
2389
2390 down_read(&mdsc->snap_rwsem);
2391
2f2dc053
SW
2392 dout("session %p state %s\n", session,
2393 session_state_name(session->s_state));
2394
e01a5946
SW
2395 /* drop old cap expires; we're about to reestablish that state */
2396 discard_cap_releases(mdsc, session);
2397
2f2dc053 2398 /* traverse this session's caps */
93cea5be
SW
2399 err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
2400 if (err)
2401 goto fail;
20cb34ae
SW
2402
2403 recon_state.pagelist = pagelist;
2404 recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2405 err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2f2dc053 2406 if (err < 0)
9abf82b8 2407 goto fail;
2f2dc053
SW
2408
2409 /*
2410 * snaprealms. we provide mds with the ino, seq (version), and
2411 * parent for all of our realms. If the mds has any newer info,
2412 * it will tell us.
2413 */
a105f00c
SW
2414 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2415 struct ceph_snap_realm *realm =
2416 rb_entry(p, struct ceph_snap_realm, node);
93cea5be 2417 struct ceph_mds_snaprealm_reconnect sr_rec;
2f2dc053
SW
2418
2419 dout(" adding snap realm %llx seq %lld parent %llx\n",
2420 realm->ino, realm->seq, realm->parent_ino);
93cea5be
SW
2421 sr_rec.ino = cpu_to_le64(realm->ino);
2422 sr_rec.seq = cpu_to_le64(realm->seq);
2423 sr_rec.parent = cpu_to_le64(realm->parent_ino);
2424 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
2425 if (err)
2426 goto fail;
2f2dc053 2427 }
2f2dc053 2428
93cea5be 2429 reply->pagelist = pagelist;
20cb34ae
SW
2430 if (recon_state.flock)
2431 reply->hdr.version = cpu_to_le16(2);
93cea5be
SW
2432 reply->hdr.data_len = cpu_to_le32(pagelist->length);
2433 reply->nr_pages = calc_pages_for(0, pagelist->length);
2f2dc053
SW
2434 ceph_con_send(&session->s_con, reply);
2435
9abf82b8
SW
2436 mutex_unlock(&session->s_mutex);
2437
2438 mutex_lock(&mdsc->mutex);
2439 __wake_requests(mdsc, &session->s_waiting);
2440 mutex_unlock(&mdsc->mutex);
2441
2f2dc053 2442 up_read(&mdsc->snap_rwsem);
2f2dc053
SW
2443 return;
2444
93cea5be 2445fail:
2f2dc053 2446 ceph_msg_put(reply);
9abf82b8
SW
2447 up_read(&mdsc->snap_rwsem);
2448 mutex_unlock(&session->s_mutex);
93cea5be
SW
2449fail_nomsg:
2450 ceph_pagelist_release(pagelist);
2451 kfree(pagelist);
2452fail_nopagelist:
9abf82b8 2453 pr_err("error %d preparing reconnect for mds%d\n", err, mds);
9abf82b8 2454 return;
2f2dc053
SW
2455}
2456
2457
2458/*
2459 * compare old and new mdsmaps, kicking requests
2460 * and closing out old connections as necessary
2461 *
2462 * called under mdsc->mutex.
2463 */
2464static void check_new_map(struct ceph_mds_client *mdsc,
2465 struct ceph_mdsmap *newmap,
2466 struct ceph_mdsmap *oldmap)
2467{
2468 int i;
2469 int oldstate, newstate;
2470 struct ceph_mds_session *s;
2471
2472 dout("check_new_map new %u old %u\n",
2473 newmap->m_epoch, oldmap->m_epoch);
2474
2475 for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
2476 if (mdsc->sessions[i] == NULL)
2477 continue;
2478 s = mdsc->sessions[i];
2479 oldstate = ceph_mdsmap_get_state(oldmap, i);
2480 newstate = ceph_mdsmap_get_state(newmap, i);
2481
0deb01c9 2482 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
2f2dc053 2483 i, ceph_mds_state_name(oldstate),
0deb01c9 2484 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
2f2dc053 2485 ceph_mds_state_name(newstate),
0deb01c9 2486 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
2f2dc053
SW
2487 session_state_name(s->s_state));
2488
2489 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
2490 ceph_mdsmap_get_addr(newmap, i),
2491 sizeof(struct ceph_entity_addr))) {
2492 if (s->s_state == CEPH_MDS_SESSION_OPENING) {
2493 /* the session never opened, just close it
2494 * out now */
2495 __wake_requests(mdsc, &s->s_waiting);
2600d2dd 2496 __unregister_session(mdsc, s);
2f2dc053
SW
2497 } else {
2498 /* just close it */
2499 mutex_unlock(&mdsc->mutex);
2500 mutex_lock(&s->s_mutex);
2501 mutex_lock(&mdsc->mutex);
2502 ceph_con_close(&s->s_con);
2503 mutex_unlock(&s->s_mutex);
2504 s->s_state = CEPH_MDS_SESSION_RESTARTING;
2505 }
2506
2507 /* kick any requests waiting on the recovering mds */
29790f26 2508 kick_requests(mdsc, i);
2f2dc053
SW
2509 } else if (oldstate == newstate) {
2510 continue; /* nothing new with this mds */
2511 }
2512
2513 /*
2514 * send reconnect?
2515 */
2516 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
34b6c855
SW
2517 newstate >= CEPH_MDS_STATE_RECONNECT) {
2518 mutex_unlock(&mdsc->mutex);
2519 send_mds_reconnect(mdsc, s);
2520 mutex_lock(&mdsc->mutex);
2521 }
2f2dc053
SW
2522
2523 /*
29790f26 2524 * kick request on any mds that has gone active.
2f2dc053
SW
2525 */
2526 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2527 newstate >= CEPH_MDS_STATE_ACTIVE) {
29790f26
SW
2528 if (oldstate != CEPH_MDS_STATE_CREATING &&
2529 oldstate != CEPH_MDS_STATE_STARTING)
2530 pr_info("mds%d recovery completed\n", s->s_mds);
2531 kick_requests(mdsc, i);
2f2dc053 2532 ceph_kick_flushing_caps(mdsc, s);
0dc2570f 2533 wake_up_session_caps(s, 1);
2f2dc053
SW
2534 }
2535 }
cb170a22
SW
2536
2537 for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
2538 s = mdsc->sessions[i];
2539 if (!s)
2540 continue;
2541 if (!ceph_mdsmap_is_laggy(newmap, i))
2542 continue;
2543 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2544 s->s_state == CEPH_MDS_SESSION_HUNG ||
2545 s->s_state == CEPH_MDS_SESSION_CLOSING) {
2546 dout(" connecting to export targets of laggy mds%d\n",
2547 i);
2548 __open_export_target_sessions(mdsc, s);
2549 }
2550 }
2f2dc053
SW
2551}
2552
2553
2554
2555/*
2556 * leases
2557 */
2558
2559/*
2560 * caller must hold session s_mutex, dentry->d_lock
2561 */
2562void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2563{
2564 struct ceph_dentry_info *di = ceph_dentry(dentry);
2565
2566 ceph_put_mds_session(di->lease_session);
2567 di->lease_session = NULL;
2568}
2569
2600d2dd
SW
2570static void handle_lease(struct ceph_mds_client *mdsc,
2571 struct ceph_mds_session *session,
2572 struct ceph_msg *msg)
2f2dc053
SW
2573{
2574 struct super_block *sb = mdsc->client->sb;
2575 struct inode *inode;
2f2dc053
SW
2576 struct ceph_inode_info *ci;
2577 struct dentry *parent, *dentry;
2578 struct ceph_dentry_info *di;
2600d2dd 2579 int mds = session->s_mds;
2f2dc053 2580 struct ceph_mds_lease *h = msg->front.iov_base;
1e5ea23d 2581 u32 seq;
2f2dc053
SW
2582 struct ceph_vino vino;
2583 int mask;
2584 struct qstr dname;
2585 int release = 0;
2586
2f2dc053
SW
2587 dout("handle_lease from mds%d\n", mds);
2588
2589 /* decode */
2590 if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
2591 goto bad;
2592 vino.ino = le64_to_cpu(h->ino);
2593 vino.snap = CEPH_NOSNAP;
2594 mask = le16_to_cpu(h->mask);
1e5ea23d 2595 seq = le32_to_cpu(h->seq);
2f2dc053
SW
2596 dname.name = (void *)h + sizeof(*h) + sizeof(u32);
2597 dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
2598 if (dname.len != get_unaligned_le32(h+1))
2599 goto bad;
2600
2f2dc053
SW
2601 mutex_lock(&session->s_mutex);
2602 session->s_seq++;
2603
2604 /* lookup inode */
2605 inode = ceph_find_inode(sb, vino);
1e5ea23d
SW
2606 dout("handle_lease %s, mask %d, ino %llx %p %.*s\n",
2607 ceph_lease_op_name(h->action), mask, vino.ino, inode,
2608 dname.len, dname.name);
2f2dc053
SW
2609 if (inode == NULL) {
2610 dout("handle_lease no inode %llx\n", vino.ino);
2611 goto release;
2612 }
2613 ci = ceph_inode(inode);
2614
2615 /* dentry */
2616 parent = d_find_alias(inode);
2617 if (!parent) {
2618 dout("no parent dentry on inode %p\n", inode);
2619 WARN_ON(1);
2620 goto release; /* hrm... */
2621 }
2622 dname.hash = full_name_hash(dname.name, dname.len);
2623 dentry = d_lookup(parent, &dname);
2624 dput(parent);
2625 if (!dentry)
2626 goto release;
2627
2628 spin_lock(&dentry->d_lock);
2629 di = ceph_dentry(dentry);
2630 switch (h->action) {
2631 case CEPH_MDS_LEASE_REVOKE:
2632 if (di && di->lease_session == session) {
1e5ea23d
SW
2633 if (ceph_seq_cmp(di->lease_seq, seq) > 0)
2634 h->seq = cpu_to_le32(di->lease_seq);
2f2dc053
SW
2635 __ceph_mdsc_drop_dentry_lease(dentry);
2636 }
2637 release = 1;
2638 break;
2639
2640 case CEPH_MDS_LEASE_RENEW:
2641 if (di && di->lease_session == session &&
2642 di->lease_gen == session->s_cap_gen &&
2643 di->lease_renew_from &&
2644 di->lease_renew_after == 0) {
2645 unsigned long duration =
2646 le32_to_cpu(h->duration_ms) * HZ / 1000;
2647
1e5ea23d 2648 di->lease_seq = seq;
2f2dc053
SW
2649 dentry->d_time = di->lease_renew_from + duration;
2650 di->lease_renew_after = di->lease_renew_from +
2651 (duration >> 1);
2652 di->lease_renew_from = 0;
2653 }
2654 break;
2655 }
2656 spin_unlock(&dentry->d_lock);
2657 dput(dentry);
2658
2659 if (!release)
2660 goto out;
2661
2662release:
2663 /* let's just reuse the same message */
2664 h->action = CEPH_MDS_LEASE_REVOKE_ACK;
2665 ceph_msg_get(msg);
2666 ceph_con_send(&session->s_con, msg);
2667
2668out:
2669 iput(inode);
2670 mutex_unlock(&session->s_mutex);
2f2dc053
SW
2671 return;
2672
2673bad:
2674 pr_err("corrupt lease message\n");
9ec7cab1 2675 ceph_msg_dump(msg);
2f2dc053
SW
2676}
2677
2678void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2679 struct inode *inode,
2680 struct dentry *dentry, char action,
2681 u32 seq)
2682{
2683 struct ceph_msg *msg;
2684 struct ceph_mds_lease *lease;
2685 int len = sizeof(*lease) + sizeof(u32);
2686 int dnamelen = 0;
2687
2688 dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
2689 inode, dentry, ceph_lease_op_name(action), session->s_mds);
2690 dnamelen = dentry->d_name.len;
2691 len += dnamelen;
2692
34d23762 2693 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS);
a79832f2 2694 if (!msg)
2f2dc053
SW
2695 return;
2696 lease = msg->front.iov_base;
2697 lease->action = action;
dd1c9057 2698 lease->mask = cpu_to_le16(1);
2f2dc053
SW
2699 lease->ino = cpu_to_le64(ceph_vino(inode).ino);
2700 lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
2701 lease->seq = cpu_to_le32(seq);
2702 put_unaligned_le32(dnamelen, lease + 1);
2703 memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
2704
2705 /*
2706 * if this is a preemptive lease RELEASE, no need to
2707 * flush request stream, since the actual request will
2708 * soon follow.
2709 */
2710 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
2711
2712 ceph_con_send(&session->s_con, msg);
2713}
2714
2715/*
2716 * Preemptively release a lease we expect to invalidate anyway.
2717 * Pass @inode always, @dentry is optional.
2718 */
2719void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2720 struct dentry *dentry, int mask)
2721{
2722 struct ceph_dentry_info *di;
2723 struct ceph_mds_session *session;
2724 u32 seq;
2725
2726 BUG_ON(inode == NULL);
2727 BUG_ON(dentry == NULL);
dd1c9057 2728 BUG_ON(mask == 0);
2f2dc053
SW
2729
2730 /* is dentry lease valid? */
2731 spin_lock(&dentry->d_lock);
2732 di = ceph_dentry(dentry);
2733 if (!di || !di->lease_session ||
2734 di->lease_session->s_mds < 0 ||
2735 di->lease_gen != di->lease_session->s_cap_gen ||
2736 !time_before(jiffies, dentry->d_time)) {
2737 dout("lease_release inode %p dentry %p -- "
2738 "no lease on %d\n",
2739 inode, dentry, mask);
2740 spin_unlock(&dentry->d_lock);
2741 return;
2742 }
2743
2744 /* we do have a lease on this dentry; note mds and seq */
2745 session = ceph_get_mds_session(di->lease_session);
2746 seq = di->lease_seq;
2747 __ceph_mdsc_drop_dentry_lease(dentry);
2748 spin_unlock(&dentry->d_lock);
2749
2750 dout("lease_release inode %p dentry %p mask %d to mds%d\n",
2751 inode, dentry, mask, session->s_mds);
2752 ceph_mdsc_lease_send_msg(session, inode, dentry,
2753 CEPH_MDS_LEASE_RELEASE, seq);
2754 ceph_put_mds_session(session);
2755}
2756
2757/*
2758 * drop all leases (and dentry refs) in preparation for umount
2759 */
2760static void drop_leases(struct ceph_mds_client *mdsc)
2761{
2762 int i;
2763
2764 dout("drop_leases\n");
2765 mutex_lock(&mdsc->mutex);
2766 for (i = 0; i < mdsc->max_sessions; i++) {
2767 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2768 if (!s)
2769 continue;
2770 mutex_unlock(&mdsc->mutex);
2771 mutex_lock(&s->s_mutex);
2772 mutex_unlock(&s->s_mutex);
2773 ceph_put_mds_session(s);
2774 mutex_lock(&mdsc->mutex);
2775 }
2776 mutex_unlock(&mdsc->mutex);
2777}
2778
2779
2780
2781/*
2782 * delayed work -- periodically trim expired leases, renew caps with mds
2783 */
2784static void schedule_delayed(struct ceph_mds_client *mdsc)
2785{
2786 int delay = 5;
2787 unsigned hz = round_jiffies_relative(HZ * delay);
2788 schedule_delayed_work(&mdsc->delayed_work, hz);
2789}
2790
2791static void delayed_work(struct work_struct *work)
2792{
2793 int i;
2794 struct ceph_mds_client *mdsc =
2795 container_of(work, struct ceph_mds_client, delayed_work.work);
2796 int renew_interval;
2797 int renew_caps;
2798
2799 dout("mdsc delayed_work\n");
afcdaea3 2800 ceph_check_delayed_caps(mdsc);
2f2dc053
SW
2801
2802 mutex_lock(&mdsc->mutex);
2803 renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
2804 renew_caps = time_after_eq(jiffies, HZ*renew_interval +
2805 mdsc->last_renew_caps);
2806 if (renew_caps)
2807 mdsc->last_renew_caps = jiffies;
2808
2809 for (i = 0; i < mdsc->max_sessions; i++) {
2810 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2811 if (s == NULL)
2812 continue;
2813 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
2814 dout("resending session close request for mds%d\n",
2815 s->s_mds);
2816 request_close_session(mdsc, s);
2817 ceph_put_mds_session(s);
2818 continue;
2819 }
2820 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
2821 if (s->s_state == CEPH_MDS_SESSION_OPEN) {
2822 s->s_state = CEPH_MDS_SESSION_HUNG;
2823 pr_info("mds%d hung\n", s->s_mds);
2824 }
2825 }
2826 if (s->s_state < CEPH_MDS_SESSION_OPEN) {
2827 /* this mds is failed or recovering, just wait */
2828 ceph_put_mds_session(s);
2829 continue;
2830 }
2831 mutex_unlock(&mdsc->mutex);
2832
2833 mutex_lock(&s->s_mutex);
2834 if (renew_caps)
2835 send_renew_caps(mdsc, s);
2836 else
2837 ceph_con_keepalive(&s->s_con);
ee6b272b 2838 ceph_add_cap_releases(mdsc, s);
aab53dd9
SW
2839 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2840 s->s_state == CEPH_MDS_SESSION_HUNG)
3d7ded4d 2841 ceph_send_cap_releases(mdsc, s);
2f2dc053
SW
2842 mutex_unlock(&s->s_mutex);
2843 ceph_put_mds_session(s);
2844
2845 mutex_lock(&mdsc->mutex);
2846 }
2847 mutex_unlock(&mdsc->mutex);
2848
2849 schedule_delayed(mdsc);
2850}
2851
2852
5f44f142 2853int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
2f2dc053
SW
2854{
2855 mdsc->client = client;
2856 mutex_init(&mdsc->mutex);
2857 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
2d06eeb8
CR
2858 if (mdsc->mdsmap == NULL)
2859 return -ENOMEM;
2860
2f2dc053
SW
2861 init_completion(&mdsc->safe_umount_waiters);
2862 init_completion(&mdsc->session_close_waiters);
2863 INIT_LIST_HEAD(&mdsc->waiting_for_map);
2864 mdsc->sessions = NULL;
2865 mdsc->max_sessions = 0;
2866 mdsc->stopping = 0;
2867 init_rwsem(&mdsc->snap_rwsem);
a105f00c 2868 mdsc->snap_realms = RB_ROOT;
2f2dc053
SW
2869 INIT_LIST_HEAD(&mdsc->snap_empty);
2870 spin_lock_init(&mdsc->snap_empty_lock);
2871 mdsc->last_tid = 0;
44ca18f2 2872 mdsc->request_tree = RB_ROOT;
2f2dc053
SW
2873 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
2874 mdsc->last_renew_caps = jiffies;
2875 INIT_LIST_HEAD(&mdsc->cap_delay_list);
2876 spin_lock_init(&mdsc->cap_delay_lock);
2877 INIT_LIST_HEAD(&mdsc->snap_flush_list);
2878 spin_lock_init(&mdsc->snap_flush_lock);
2879 mdsc->cap_flush_seq = 0;
2880 INIT_LIST_HEAD(&mdsc->cap_dirty);
2881 mdsc->num_cap_flushing = 0;
2882 spin_lock_init(&mdsc->cap_dirty_lock);
2883 init_waitqueue_head(&mdsc->cap_flushing_wq);
2884 spin_lock_init(&mdsc->dentry_lru_lock);
2885 INIT_LIST_HEAD(&mdsc->dentry_lru);
2d06eeb8 2886
37151668
YS
2887 ceph_caps_init(mdsc);
2888 ceph_adjust_min_caps(mdsc, client->min_caps);
2889
5f44f142 2890 return 0;
2f2dc053
SW
2891}
2892
2893/*
2894 * Wait for safe replies on open mds requests. If we time out, drop
2895 * all requests from the tree to avoid dangling dentry refs.
2896 */
2897static void wait_requests(struct ceph_mds_client *mdsc)
2898{
2899 struct ceph_mds_request *req;
2900 struct ceph_client *client = mdsc->client;
2901
2902 mutex_lock(&mdsc->mutex);
44ca18f2 2903 if (__get_oldest_req(mdsc)) {
2f2dc053 2904 mutex_unlock(&mdsc->mutex);
44ca18f2 2905
2f2dc053
SW
2906 dout("wait_requests waiting for requests\n");
2907 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
6b805185 2908 client->mount_args->mount_timeout * HZ);
2f2dc053
SW
2909
2910 /* tear down remaining requests */
44ca18f2
SW
2911 mutex_lock(&mdsc->mutex);
2912 while ((req = __get_oldest_req(mdsc))) {
2f2dc053
SW
2913 dout("wait_requests timed out on tid %llu\n",
2914 req->r_tid);
44ca18f2 2915 __unregister_request(mdsc, req);
2f2dc053
SW
2916 }
2917 }
2918 mutex_unlock(&mdsc->mutex);
2919 dout("wait_requests done\n");
2920}
2921
2922/*
2923 * called before mount is ro, and before dentries are torn down.
2924 * (hmm, does this still race with new lookups?)
2925 */
2926void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
2927{
2928 dout("pre_umount\n");
2929 mdsc->stopping = 1;
2930
2931 drop_leases(mdsc);
afcdaea3 2932 ceph_flush_dirty_caps(mdsc);
2f2dc053 2933 wait_requests(mdsc);
17c688c3
SW
2934
2935 /*
2936 * wait for reply handlers to drop their request refs and
2937 * their inode/dcache refs
2938 */
2939 ceph_msgr_flush();
2f2dc053
SW
2940}
2941
2942/*
2943 * wait for all write mds requests to flush.
2944 */
2945static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
2946{
80fc7314 2947 struct ceph_mds_request *req = NULL, *nextreq;
44ca18f2 2948 struct rb_node *n;
2f2dc053
SW
2949
2950 mutex_lock(&mdsc->mutex);
2951 dout("wait_unsafe_requests want %lld\n", want_tid);
80fc7314 2952restart:
44ca18f2
SW
2953 req = __get_oldest_req(mdsc);
2954 while (req && req->r_tid <= want_tid) {
80fc7314
SW
2955 /* find next request */
2956 n = rb_next(&req->r_node);
2957 if (n)
2958 nextreq = rb_entry(n, struct ceph_mds_request, r_node);
2959 else
2960 nextreq = NULL;
44ca18f2
SW
2961 if ((req->r_op & CEPH_MDS_OP_WRITE)) {
2962 /* write op */
2963 ceph_mdsc_get_request(req);
80fc7314
SW
2964 if (nextreq)
2965 ceph_mdsc_get_request(nextreq);
44ca18f2
SW
2966 mutex_unlock(&mdsc->mutex);
2967 dout("wait_unsafe_requests wait on %llu (want %llu)\n",
2968 req->r_tid, want_tid);
2969 wait_for_completion(&req->r_safe_completion);
2970 mutex_lock(&mdsc->mutex);
44ca18f2 2971 ceph_mdsc_put_request(req);
80fc7314
SW
2972 if (!nextreq)
2973 break; /* next dne before, so we're done! */
2974 if (RB_EMPTY_NODE(&nextreq->r_node)) {
2975 /* next request was removed from tree */
2976 ceph_mdsc_put_request(nextreq);
2977 goto restart;
2978 }
2979 ceph_mdsc_put_request(nextreq); /* won't go away */
44ca18f2 2980 }
80fc7314 2981 req = nextreq;
2f2dc053
SW
2982 }
2983 mutex_unlock(&mdsc->mutex);
2984 dout("wait_unsafe_requests done\n");
2985}
2986
2987void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
2988{
2989 u64 want_tid, want_flush;
2990
56b7cf95
SW
2991 if (mdsc->client->mount_state == CEPH_MOUNT_SHUTDOWN)
2992 return;
2993
2f2dc053
SW
2994 dout("sync\n");
2995 mutex_lock(&mdsc->mutex);
2996 want_tid = mdsc->last_tid;
2997 want_flush = mdsc->cap_flush_seq;
2998 mutex_unlock(&mdsc->mutex);
2999 dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3000
afcdaea3 3001 ceph_flush_dirty_caps(mdsc);
2f2dc053
SW
3002
3003 wait_unsafe_requests(mdsc, want_tid);
3004 wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
3005}
3006
3007
3008/*
3009 * called after sb is ro.
3010 */
3011void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3012{
3013 struct ceph_mds_session *session;
3014 int i;
3015 int n;
3016 struct ceph_client *client = mdsc->client;
6b805185 3017 unsigned long started, timeout = client->mount_args->mount_timeout * HZ;
2f2dc053
SW
3018
3019 dout("close_sessions\n");
3020
3021 mutex_lock(&mdsc->mutex);
3022
3023 /* close sessions */
3024 started = jiffies;
3025 while (time_before(jiffies, started + timeout)) {
3026 dout("closing sessions\n");
3027 n = 0;
3028 for (i = 0; i < mdsc->max_sessions; i++) {
3029 session = __ceph_lookup_mds_session(mdsc, i);
3030 if (!session)
3031 continue;
3032 mutex_unlock(&mdsc->mutex);
3033 mutex_lock(&session->s_mutex);
3034 __close_session(mdsc, session);
3035 mutex_unlock(&session->s_mutex);
3036 ceph_put_mds_session(session);
3037 mutex_lock(&mdsc->mutex);
3038 n++;
3039 }
3040 if (n == 0)
3041 break;
3042
3043 if (client->mount_state == CEPH_MOUNT_SHUTDOWN)
3044 break;
3045
3046 dout("waiting for sessions to close\n");
3047 mutex_unlock(&mdsc->mutex);
3048 wait_for_completion_timeout(&mdsc->session_close_waiters,
3049 timeout);
3050 mutex_lock(&mdsc->mutex);
3051 }
3052
3053 /* tear down remaining sessions */
3054 for (i = 0; i < mdsc->max_sessions; i++) {
3055 if (mdsc->sessions[i]) {
3056 session = get_session(mdsc->sessions[i]);
2600d2dd 3057 __unregister_session(mdsc, session);
2f2dc053
SW
3058 mutex_unlock(&mdsc->mutex);
3059 mutex_lock(&session->s_mutex);
3060 remove_session_caps(session);
3061 mutex_unlock(&session->s_mutex);
3062 ceph_put_mds_session(session);
3063 mutex_lock(&mdsc->mutex);
3064 }
3065 }
3066
3067 WARN_ON(!list_empty(&mdsc->cap_delay_list));
3068
3069 mutex_unlock(&mdsc->mutex);
3070
3071 ceph_cleanup_empty_realms(mdsc);
3072
3073 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3074
3075 dout("stopped\n");
3076}
3077
3078void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3079{
3080 dout("stop\n");
3081 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3082 if (mdsc->mdsmap)
3083 ceph_mdsmap_destroy(mdsc->mdsmap);
3084 kfree(mdsc->sessions);
37151668 3085 ceph_caps_finalize(mdsc);
2f2dc053
SW
3086}
3087
3088
3089/*
3090 * handle mds map update.
3091 */
3092void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3093{
3094 u32 epoch;
3095 u32 maplen;
3096 void *p = msg->front.iov_base;
3097 void *end = p + msg->front.iov_len;
3098 struct ceph_mdsmap *newmap, *oldmap;
3099 struct ceph_fsid fsid;
3100 int err = -EINVAL;
3101
3102 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3103 ceph_decode_copy(&p, &fsid, sizeof(fsid));
0743304d
SW
3104 if (ceph_check_fsid(mdsc->client, &fsid) < 0)
3105 return;
c89136ea
SW
3106 epoch = ceph_decode_32(&p);
3107 maplen = ceph_decode_32(&p);
2f2dc053
SW
3108 dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3109
3110 /* do we need it? */
3111 ceph_monc_got_mdsmap(&mdsc->client->monc, epoch);
3112 mutex_lock(&mdsc->mutex);
3113 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3114 dout("handle_map epoch %u <= our %u\n",
3115 epoch, mdsc->mdsmap->m_epoch);
3116 mutex_unlock(&mdsc->mutex);
3117 return;
3118 }
3119
3120 newmap = ceph_mdsmap_decode(&p, end);
3121 if (IS_ERR(newmap)) {
3122 err = PTR_ERR(newmap);
3123 goto bad_unlock;
3124 }
3125
3126 /* swap into place */
3127 if (mdsc->mdsmap) {
3128 oldmap = mdsc->mdsmap;
3129 mdsc->mdsmap = newmap;
3130 check_new_map(mdsc, newmap, oldmap);
3131 ceph_mdsmap_destroy(oldmap);
3132 } else {
3133 mdsc->mdsmap = newmap; /* first mds map */
3134 }
3135 mdsc->client->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3136
3137 __wake_requests(mdsc, &mdsc->waiting_for_map);
3138
3139 mutex_unlock(&mdsc->mutex);
3140 schedule_delayed(mdsc);
3141 return;
3142
3143bad_unlock:
3144 mutex_unlock(&mdsc->mutex);
3145bad:
3146 pr_err("error decoding mdsmap %d\n", err);
3147 return;
3148}
3149
3150static struct ceph_connection *con_get(struct ceph_connection *con)
3151{
3152 struct ceph_mds_session *s = con->private;
3153
3154 if (get_session(s)) {
2600d2dd 3155 dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
2f2dc053
SW
3156 return con;
3157 }
3158 dout("mdsc con_get %p FAIL\n", s);
3159 return NULL;
3160}
3161
3162static void con_put(struct ceph_connection *con)
3163{
3164 struct ceph_mds_session *s = con->private;
3165
2f2dc053 3166 ceph_put_mds_session(s);
2600d2dd 3167 dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref));
2f2dc053
SW
3168}
3169
3170/*
3171 * if the client is unresponsive for long enough, the mds will kill
3172 * the session entirely.
3173 */
3174static void peer_reset(struct ceph_connection *con)
3175{
3176 struct ceph_mds_session *s = con->private;
7e70f0ed 3177 struct ceph_mds_client *mdsc = s->s_mdsc;
2f2dc053 3178
7e70f0ed
SW
3179 pr_warning("mds%d closed our session\n", s->s_mds);
3180 send_mds_reconnect(mdsc, s);
2f2dc053
SW
3181}
3182
3183static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3184{
3185 struct ceph_mds_session *s = con->private;
3186 struct ceph_mds_client *mdsc = s->s_mdsc;
3187 int type = le16_to_cpu(msg->hdr.type);
3188
2600d2dd
SW
3189 mutex_lock(&mdsc->mutex);
3190 if (__verify_registered_session(mdsc, s) < 0) {
3191 mutex_unlock(&mdsc->mutex);
3192 goto out;
3193 }
3194 mutex_unlock(&mdsc->mutex);
3195
2f2dc053
SW
3196 switch (type) {
3197 case CEPH_MSG_MDS_MAP:
3198 ceph_mdsc_handle_map(mdsc, msg);
3199 break;
3200 case CEPH_MSG_CLIENT_SESSION:
3201 handle_session(s, msg);
3202 break;
3203 case CEPH_MSG_CLIENT_REPLY:
3204 handle_reply(s, msg);
3205 break;
3206 case CEPH_MSG_CLIENT_REQUEST_FORWARD:
2600d2dd 3207 handle_forward(mdsc, s, msg);
2f2dc053
SW
3208 break;
3209 case CEPH_MSG_CLIENT_CAPS:
3210 ceph_handle_caps(s, msg);
3211 break;
3212 case CEPH_MSG_CLIENT_SNAP:
2600d2dd 3213 ceph_handle_snap(mdsc, s, msg);
2f2dc053
SW
3214 break;
3215 case CEPH_MSG_CLIENT_LEASE:
2600d2dd 3216 handle_lease(mdsc, s, msg);
2f2dc053
SW
3217 break;
3218
3219 default:
3220 pr_err("received unknown message type %d %s\n", type,
3221 ceph_msg_type_name(type));
3222 }
2600d2dd 3223out:
2f2dc053
SW
3224 ceph_msg_put(msg);
3225}
3226
4e7a5dcd
SW
3227/*
3228 * authentication
3229 */
3230static int get_authorizer(struct ceph_connection *con,
3231 void **buf, int *len, int *proto,
3232 void **reply_buf, int *reply_len, int force_new)
3233{
3234 struct ceph_mds_session *s = con->private;
3235 struct ceph_mds_client *mdsc = s->s_mdsc;
3236 struct ceph_auth_client *ac = mdsc->client->monc.auth;
3237 int ret = 0;
3238
3239 if (force_new && s->s_authorizer) {
3240 ac->ops->destroy_authorizer(ac, s->s_authorizer);
3241 s->s_authorizer = NULL;
3242 }
3243 if (s->s_authorizer == NULL) {
3244 if (ac->ops->create_authorizer) {
3245 ret = ac->ops->create_authorizer(
3246 ac, CEPH_ENTITY_TYPE_MDS,
3247 &s->s_authorizer,
3248 &s->s_authorizer_buf,
3249 &s->s_authorizer_buf_len,
3250 &s->s_authorizer_reply_buf,
3251 &s->s_authorizer_reply_buf_len);
3252 if (ret)
3253 return ret;
3254 }
3255 }
3256
3257 *proto = ac->protocol;
3258 *buf = s->s_authorizer_buf;
3259 *len = s->s_authorizer_buf_len;
3260 *reply_buf = s->s_authorizer_reply_buf;
3261 *reply_len = s->s_authorizer_reply_buf_len;
3262 return 0;
3263}
3264
3265
3266static int verify_authorizer_reply(struct ceph_connection *con, int len)
3267{
3268 struct ceph_mds_session *s = con->private;
3269 struct ceph_mds_client *mdsc = s->s_mdsc;
3270 struct ceph_auth_client *ac = mdsc->client->monc.auth;
3271
3272 return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len);
3273}
3274
9bd2e6f8
SW
3275static int invalidate_authorizer(struct ceph_connection *con)
3276{
3277 struct ceph_mds_session *s = con->private;
3278 struct ceph_mds_client *mdsc = s->s_mdsc;
3279 struct ceph_auth_client *ac = mdsc->client->monc.auth;
3280
3281 if (ac->ops->invalidate_authorizer)
3282 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3283
3284 return ceph_monc_validate_auth(&mdsc->client->monc);
3285}
3286
9e32789f 3287static const struct ceph_connection_operations mds_con_ops = {
2f2dc053
SW
3288 .get = con_get,
3289 .put = con_put,
3290 .dispatch = dispatch,
4e7a5dcd
SW
3291 .get_authorizer = get_authorizer,
3292 .verify_authorizer_reply = verify_authorizer_reply,
9bd2e6f8 3293 .invalidate_authorizer = invalidate_authorizer,
2f2dc053 3294 .peer_reset = peer_reset,
2f2dc053
SW
3295};
3296
3297
3298
3299
3300/* eof */