]> bbs.cooldavid.org Git - net-next-2.6.git/blobdiff - fs/ceph/caps.c
ceph: fix xattr cap writeback
[net-next-2.6.git] / fs / ceph / caps.c
index ae3e3a3064451f7b49fedaa3931120ff1af6d169..0ac2703f3bdf0c973b1ba2a3f61fa3b0d1c6f46e 100644 (file)
@@ -113,58 +113,41 @@ const char *ceph_cap_string(int caps)
        return cap_str[i];
 }
 
-/*
- * Cap reservations
- *
- * Maintain a global pool of preallocated struct ceph_caps, referenced
- * by struct ceph_caps_reservations.  This ensures that we preallocate
- * memory needed to successfully process an MDS response.  (If an MDS
- * sends us cap information and we fail to process it, we will have
- * problems due to the client and MDS being out of sync.)
- *
- * Reservations are 'owned' by a ceph_cap_reservation context.
- */
-static spinlock_t caps_list_lock;
-static struct list_head caps_list;  /* unused (reserved or unreserved) */
-static int caps_total_count;        /* total caps allocated */
-static int caps_use_count;          /* in use */
-static int caps_reserve_count;      /* unused, reserved */
-static int caps_avail_count;        /* unused, unreserved */
-static int caps_min_count;          /* keep at least this many (unreserved) */
-
-void __init ceph_caps_init(void)
+void ceph_caps_init(struct ceph_mds_client *mdsc)
 {
-       INIT_LIST_HEAD(&caps_list);
-       spin_lock_init(&caps_list_lock);
+       INIT_LIST_HEAD(&mdsc->caps_list);
+       spin_lock_init(&mdsc->caps_list_lock);
 }
 
-void ceph_caps_finalize(void)
+void ceph_caps_finalize(struct ceph_mds_client *mdsc)
 {
        struct ceph_cap *cap;
 
-       spin_lock(&caps_list_lock);
-       while (!list_empty(&caps_list)) {
-               cap = list_first_entry(&caps_list, struct ceph_cap, caps_item);
+       spin_lock(&mdsc->caps_list_lock);
+       while (!list_empty(&mdsc->caps_list)) {
+               cap = list_first_entry(&mdsc->caps_list,
+                                      struct ceph_cap, caps_item);
                list_del(&cap->caps_item);
                kmem_cache_free(ceph_cap_cachep, cap);
        }
-       caps_total_count = 0;
-       caps_avail_count = 0;
-       caps_use_count = 0;
-       caps_reserve_count = 0;
-       caps_min_count = 0;
-       spin_unlock(&caps_list_lock);
+       mdsc->caps_total_count = 0;
+       mdsc->caps_avail_count = 0;
+       mdsc->caps_use_count = 0;
+       mdsc->caps_reserve_count = 0;
+       mdsc->caps_min_count = 0;
+       spin_unlock(&mdsc->caps_list_lock);
 }
 
-void ceph_adjust_min_caps(int delta)
+void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
 {
-       spin_lock(&caps_list_lock);
-       caps_min_count += delta;
-       BUG_ON(caps_min_count < 0);
-       spin_unlock(&caps_list_lock);
+       spin_lock(&mdsc->caps_list_lock);
+       mdsc->caps_min_count += delta;
+       BUG_ON(mdsc->caps_min_count < 0);
+       spin_unlock(&mdsc->caps_list_lock);
 }
 
-int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
+int ceph_reserve_caps(struct ceph_mds_client *mdsc,
+                     struct ceph_cap_reservation *ctx, int need)
 {
        int i;
        struct ceph_cap *cap;
@@ -176,16 +159,17 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
        dout("reserve caps ctx=%p need=%d\n", ctx, need);
 
        /* first reserve any caps that are already allocated */
-       spin_lock(&caps_list_lock);
-       if (caps_avail_count >= need)
+       spin_lock(&mdsc->caps_list_lock);
+       if (mdsc->caps_avail_count >= need)
                have = need;
        else
-               have = caps_avail_count;
-       caps_avail_count -= have;
-       caps_reserve_count += have;
-       BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-              caps_avail_count);
-       spin_unlock(&caps_list_lock);
+               have = mdsc->caps_avail_count;
+       mdsc->caps_avail_count -= have;
+       mdsc->caps_reserve_count += have;
+       BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+                                        mdsc->caps_reserve_count +
+                                        mdsc->caps_avail_count);
+       spin_unlock(&mdsc->caps_list_lock);
 
        for (i = have; i < need; i++) {
                cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
@@ -198,19 +182,20 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
        }
        BUG_ON(have + alloc != need);
 
-       spin_lock(&caps_list_lock);
-       caps_total_count += alloc;
-       caps_reserve_count += alloc;
-       list_splice(&newcaps, &caps_list);
+       spin_lock(&mdsc->caps_list_lock);
+       mdsc->caps_total_count += alloc;
+       mdsc->caps_reserve_count += alloc;
+       list_splice(&newcaps, &mdsc->caps_list);
 
-       BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-              caps_avail_count);
-       spin_unlock(&caps_list_lock);
+       BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+                                        mdsc->caps_reserve_count +
+                                        mdsc->caps_avail_count);
+       spin_unlock(&mdsc->caps_list_lock);
 
        ctx->count = need;
        dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
-            ctx, caps_total_count, caps_use_count, caps_reserve_count,
-            caps_avail_count);
+            ctx, mdsc->caps_total_count, mdsc->caps_use_count,
+            mdsc->caps_reserve_count, mdsc->caps_avail_count);
        return 0;
 
 out_alloc_count:
@@ -220,92 +205,104 @@ out_alloc_count:
        return ret;
 }
 
-int ceph_unreserve_caps(struct ceph_cap_reservation *ctx)
+int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
+                       struct ceph_cap_reservation *ctx)
 {
        dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
        if (ctx->count) {
-               spin_lock(&caps_list_lock);
-               BUG_ON(caps_reserve_count < ctx->count);
-               caps_reserve_count -= ctx->count;
-               caps_avail_count += ctx->count;
+               spin_lock(&mdsc->caps_list_lock);
+               BUG_ON(mdsc->caps_reserve_count < ctx->count);
+               mdsc->caps_reserve_count -= ctx->count;
+               mdsc->caps_avail_count += ctx->count;
                ctx->count = 0;
                dout("unreserve caps %d = %d used + %d resv + %d avail\n",
-                    caps_total_count, caps_use_count, caps_reserve_count,
-                    caps_avail_count);
-               BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-                      caps_avail_count);
-               spin_unlock(&caps_list_lock);
+                    mdsc->caps_total_count, mdsc->caps_use_count,
+                    mdsc->caps_reserve_count, mdsc->caps_avail_count);
+               BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+                                                mdsc->caps_reserve_count +
+                                                mdsc->caps_avail_count);
+               spin_unlock(&mdsc->caps_list_lock);
        }
        return 0;
 }
 
-static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
+static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
+                               struct ceph_cap_reservation *ctx)
 {
        struct ceph_cap *cap = NULL;
 
        /* temporary, until we do something about cap import/export */
-       if (!ctx)
-               return kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
+       if (!ctx) {
+               cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
+               if (cap) {
+                       mdsc->caps_use_count++;
+                       mdsc->caps_total_count++;
+               }
+               return cap;
+       }
 
-       spin_lock(&caps_list_lock);
+       spin_lock(&mdsc->caps_list_lock);
        dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
-            ctx, ctx->count, caps_total_count, caps_use_count,
-            caps_reserve_count, caps_avail_count);
+            ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count,
+            mdsc->caps_reserve_count, mdsc->caps_avail_count);
        BUG_ON(!ctx->count);
-       BUG_ON(ctx->count > caps_reserve_count);
-       BUG_ON(list_empty(&caps_list));
+       BUG_ON(ctx->count > mdsc->caps_reserve_count);
+       BUG_ON(list_empty(&mdsc->caps_list));
 
        ctx->count--;
-       caps_reserve_count--;
-       caps_use_count++;
+       mdsc->caps_reserve_count--;
+       mdsc->caps_use_count++;
 
-       cap = list_first_entry(&caps_list, struct ceph_cap, caps_item);
+       cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item);
        list_del(&cap->caps_item);
 
-       BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-              caps_avail_count);
-       spin_unlock(&caps_list_lock);
+       BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+              mdsc->caps_reserve_count + mdsc->caps_avail_count);
+       spin_unlock(&mdsc->caps_list_lock);
        return cap;
 }
 
-void ceph_put_cap(struct ceph_cap *cap)
+void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap)
 {
-       spin_lock(&caps_list_lock);
+       spin_lock(&mdsc->caps_list_lock);
        dout("put_cap %p %d = %d used + %d resv + %d avail\n",
-            cap, caps_total_count, caps_use_count,
-            caps_reserve_count, caps_avail_count);
-       caps_use_count--;
+            cap, mdsc->caps_total_count, mdsc->caps_use_count,
+            mdsc->caps_reserve_count, mdsc->caps_avail_count);
+       mdsc->caps_use_count--;
        /*
         * Keep some preallocated caps around (ceph_min_count), to
         * avoid lots of free/alloc churn.
         */
-       if (caps_avail_count >= caps_reserve_count + caps_min_count) {
-               caps_total_count--;
+       if (mdsc->caps_avail_count >= mdsc->caps_reserve_count +
+                                     mdsc->caps_min_count) {
+               mdsc->caps_total_count--;
                kmem_cache_free(ceph_cap_cachep, cap);
        } else {
-               caps_avail_count++;
-               list_add(&cap->caps_item, &caps_list);
+               mdsc->caps_avail_count++;
+               list_add(&cap->caps_item, &mdsc->caps_list);
        }
 
-       BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-              caps_avail_count);
-       spin_unlock(&caps_list_lock);
+       BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+              mdsc->caps_reserve_count + mdsc->caps_avail_count);
+       spin_unlock(&mdsc->caps_list_lock);
 }
 
 void ceph_reservation_status(struct ceph_client *client,
                             int *total, int *avail, int *used, int *reserved,
                             int *min)
 {
+       struct ceph_mds_client *mdsc = &client->mdsc;
+
        if (total)
-               *total = caps_total_count;
+               *total = mdsc->caps_total_count;
        if (avail)
-               *avail = caps_avail_count;
+               *avail = mdsc->caps_avail_count;
        if (used)
-               *used = caps_use_count;
+               *used = mdsc->caps_use_count;
        if (reserved)
-               *reserved = caps_reserve_count;
+               *reserved = mdsc->caps_reserve_count;
        if (min)
-               *min = caps_min_count;
+               *min = mdsc->caps_min_count;
 }
 
 /*
@@ -330,22 +327,29 @@ static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds)
        return NULL;
 }
 
+struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds)
+{
+       struct ceph_cap *cap;
+
+       spin_lock(&ci->vfs_inode.i_lock);
+       cap = __get_cap_for_mds(ci, mds);
+       spin_unlock(&ci->vfs_inode.i_lock);
+       return cap;
+}
+
 /*
- * Return id of any MDS with a cap, preferably FILE_WR|WRBUFFER|EXCL, else
- * -1.
+ * Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1.
  */
-static int __ceph_get_cap_mds(struct ceph_inode_info *ci, u32 *mseq)
+static int __ceph_get_cap_mds(struct ceph_inode_info *ci)
 {
        struct ceph_cap *cap;
        int mds = -1;
        struct rb_node *p;
 
-       /* prefer mds with WR|WRBUFFER|EXCL caps */
+       /* prefer mds with WR|BUFFER|EXCL caps */
        for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
                cap = rb_entry(p, struct ceph_cap, ci_node);
                mds = cap->mds;
-               if (mseq)
-                       *mseq = cap->mseq;
                if (cap->issued & (CEPH_CAP_FILE_WR |
                                   CEPH_CAP_FILE_BUFFER |
                                   CEPH_CAP_FILE_EXCL))
@@ -358,7 +362,7 @@ int ceph_get_cap_mds(struct inode *inode)
 {
        int mds;
        spin_lock(&inode->i_lock);
-       mds = __ceph_get_cap_mds(ceph_inode(inode), NULL);
+       mds = __ceph_get_cap_mds(ceph_inode(inode));
        spin_unlock(&inode->i_lock);
        return mds;
 }
@@ -477,8 +481,8 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
         * Each time we receive FILE_CACHE anew, we increment
         * i_rdcache_gen.
         */
-       if ((issued & CEPH_CAP_FILE_CACHE) &&
-           (had & CEPH_CAP_FILE_CACHE) == 0)
+       if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
+           (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
                ci->i_rdcache_gen++;
 
        /*
@@ -537,7 +541,7 @@ retry:
                        new_cap = NULL;
                } else {
                        spin_unlock(&inode->i_lock);
-                       new_cap = get_cap(caps_reservation);
+                       new_cap = get_cap(mdsc, caps_reservation);
                        if (new_cap == NULL)
                                return -ENOMEM;
                        goto retry;
@@ -582,6 +586,7 @@ retry:
                } else {
                        pr_err("ceph_add_cap: couldn't find snap realm %llx\n",
                               realmino);
+                       WARN_ON(!realm);
                }
        }
 
@@ -621,7 +626,7 @@ retry:
        if (fmode >= 0)
                __ceph_get_fmode(ci, fmode);
        spin_unlock(&inode->i_lock);
-       wake_up(&ci->i_cap_wq);
+       wake_up_all(&ci->i_cap_wq);
        return 0;
 }
 
@@ -825,7 +830,7 @@ int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
 {
        int want = 0;
        int mode;
-       for (mode = 0; mode < 4; mode++)
+       for (mode = 0; mode < CEPH_FILE_MODE_NUM; mode++)
                if (ci->i_nr_by_mode[mode])
                        want |= ceph_caps_for_mode(mode);
        return want;
@@ -895,7 +900,7 @@ void __ceph_remove_cap(struct ceph_cap *cap)
                ci->i_auth_cap = NULL;
 
        if (removed)
-               ceph_put_cap(cap);
+               ceph_put_cap(mdsc, cap);
 
        if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
                struct ceph_snap_realm *realm = ci->i_snap_realm;
@@ -981,6 +986,46 @@ static int send_cap_msg(struct ceph_mds_session *session,
        return 0;
 }
 
+static void __queue_cap_release(struct ceph_mds_session *session,
+                               u64 ino, u64 cap_id, u32 migrate_seq,
+                               u32 issue_seq)
+{
+       struct ceph_msg *msg;
+       struct ceph_mds_cap_release *head;
+       struct ceph_mds_cap_item *item;
+
+       spin_lock(&session->s_cap_lock);
+       BUG_ON(!session->s_num_cap_releases);
+       msg = list_first_entry(&session->s_cap_releases,
+                              struct ceph_msg, list_head);
+
+       dout(" adding %llx release to mds%d msg %p (%d left)\n",
+            ino, session->s_mds, msg, session->s_num_cap_releases);
+
+       BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE);
+       head = msg->front.iov_base;
+       head->num = cpu_to_le32(le32_to_cpu(head->num) + 1);
+       item = msg->front.iov_base + msg->front.iov_len;
+       item->ino = cpu_to_le64(ino);
+       item->cap_id = cpu_to_le64(cap_id);
+       item->migrate_seq = cpu_to_le32(migrate_seq);
+       item->seq = cpu_to_le32(issue_seq);
+
+       session->s_num_cap_releases--;
+
+       msg->front.iov_len += sizeof(*item);
+       if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
+               dout(" release msg %p full\n", msg);
+               list_move_tail(&msg->list_head, &session->s_cap_releases_done);
+       } else {
+               dout(" release msg %p at %d/%d (%d)\n", msg,
+                    (int)le32_to_cpu(head->num),
+                    (int)CEPH_CAPS_PER_RELEASE,
+                    (int)msg->front.iov_len);
+       }
+       spin_unlock(&session->s_cap_lock);
+}
+
 /*
  * Queue cap releases when an inode is dropped from our cache.  Since
  * inode is about to be destroyed, there is no need for i_lock.
@@ -994,41 +1039,9 @@ void ceph_queue_caps_release(struct inode *inode)
        while (p) {
                struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
                struct ceph_mds_session *session = cap->session;
-               struct ceph_msg *msg;
-               struct ceph_mds_cap_release *head;
-               struct ceph_mds_cap_item *item;
 
-               spin_lock(&session->s_cap_lock);
-               BUG_ON(!session->s_num_cap_releases);
-               msg = list_first_entry(&session->s_cap_releases,
-                                      struct ceph_msg, list_head);
-
-               dout(" adding %p release to mds%d msg %p (%d left)\n",
-                    inode, session->s_mds, msg, session->s_num_cap_releases);
-
-               BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE);
-               head = msg->front.iov_base;
-               head->num = cpu_to_le32(le32_to_cpu(head->num) + 1);
-               item = msg->front.iov_base + msg->front.iov_len;
-               item->ino = cpu_to_le64(ceph_ino(inode));
-               item->cap_id = cpu_to_le64(cap->cap_id);
-               item->migrate_seq = cpu_to_le32(cap->mseq);
-               item->seq = cpu_to_le32(cap->issue_seq);
-
-               session->s_num_cap_releases--;
-
-               msg->front.iov_len += sizeof(*item);
-               if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
-                       dout(" release msg %p full\n", msg);
-                       list_move_tail(&msg->list_head,
-                                      &session->s_cap_releases_done);
-               } else {
-                       dout(" release msg %p at %d/%d (%d)\n", msg,
-                            (int)le32_to_cpu(head->num),
-                            (int)CEPH_CAPS_PER_RELEASE,
-                            (int)msg->front.iov_len);
-               }
-               spin_unlock(&session->s_cap_lock);
+               __queue_cap_release(session, ceph_ino(inode), cap->cap_id,
+                                   cap->mseq, cap->issue_seq);
                p = rb_next(p);
                __ceph_remove_cap(cap);
        }
@@ -1069,6 +1082,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
        gid_t gid;
        struct ceph_mds_session *session;
        u64 xattr_version = 0;
+       struct ceph_buffer *xattr_blob = NULL;
        int delayed = 0;
        u64 flush_tid = 0;
        int i;
@@ -1147,9 +1161,10 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
        gid = inode->i_gid;
        mode = inode->i_mode;
 
-       if (dropping & CEPH_CAP_XATTR_EXCL) {
+       if (flushing & CEPH_CAP_XATTR_EXCL) {
                __ceph_build_xattrs_blob(ci);
-               xattr_version = ci->i_xattrs.version + 1;
+               xattr_blob = ci->i_xattrs.blob;
+               xattr_version = ci->i_xattrs.version;
        }
 
        spin_unlock(&inode->i_lock);
@@ -1157,9 +1172,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
        ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
                op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
                size, max_size, &mtime, &atime, time_warp_seq,
-               uid, gid, mode,
-               xattr_version,
-               (flushing & CEPH_CAP_XATTR_EXCL) ? ci->i_xattrs.blob : NULL,
+               uid, gid, mode, xattr_version, xattr_blob,
                follows);
        if (ret < 0) {
                dout("error sending cap msg, must requeue %p\n", inode);
@@ -1167,7 +1180,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
        }
 
        if (wake)
-               wake_up(&ci->i_cap_wq);
+               wake_up_all(&ci->i_cap_wq);
 
        return delayed;
 }
@@ -1183,6 +1196,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
  */
 void __ceph_flush_snaps(struct ceph_inode_info *ci,
                        struct ceph_mds_session **psession)
+               __releases(ci->vfs_inode->i_lock)
+               __acquires(ci->vfs_inode->i_lock)
 {
        struct inode *inode = &ci->vfs_inode;
        int mds;
@@ -1218,7 +1233,13 @@ retry:
                BUG_ON(capsnap->dirty == 0);
 
                /* pick mds, take s_mutex */
-               mds = __ceph_get_cap_mds(ci, &mseq);
+               if (ci->i_auth_cap == NULL) {
+                       dout("no auth cap (migrating?), doing nothing\n");
+                       goto out;
+               }
+               mds = ci->i_auth_cap->session->s_mds;
+               mseq = ci->i_auth_cap->mseq;
+
                if (session && session->s_mds != mds) {
                        dout("oops, wrong session %p mutex\n", session);
                        mutex_unlock(&session->s_mutex);
@@ -1237,8 +1258,8 @@ retry:
                        }
                        /*
                         * if session == NULL, we raced against a cap
-                        * deletion.  retry, and we'll get a better
-                        * @mds value next time.
+                        * deletion or migration.  retry, and we'll
+                        * get a better @mds value next time.
                         */
                        spin_lock(&inode->i_lock);
                        goto retry;
@@ -1276,6 +1297,7 @@ retry:
        list_del_init(&ci->i_snap_flush_item);
        spin_unlock(&mdsc->snap_flush_lock);
 
+out:
        if (psession)
                *psession = session;
        else if (session) {
@@ -1421,7 +1443,6 @@ static int try_nonblocking_invalidate(struct inode *inode)
  */
 void ceph_check_caps(struct ceph_inode_info *ci, int flags,
                     struct ceph_mds_session *session)
-       __releases(session->s_mutex)
 {
        struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
        struct ceph_mds_client *mdsc = &client->mdsc;
@@ -1496,11 +1517,13 @@ retry_locked:
            ci->i_wrbuffer_ref == 0 &&               /* no dirty pages... */
            ci->i_rdcache_gen &&                     /* may have cached pages */
            (file_wanted == 0 ||                     /* no open files */
-            (revoking & CEPH_CAP_FILE_CACHE)) &&     /*  or revoking cache */
+            (revoking & (CEPH_CAP_FILE_CACHE|
+                         CEPH_CAP_FILE_LAZYIO))) && /*  or revoking cache */
            !tried_invalidate) {
                dout("check_caps trying to invalidate on %p\n", inode);
                if (try_nonblocking_invalidate(inode) < 0) {
-                       if (revoking & CEPH_CAP_FILE_CACHE) {
+                       if (revoking & (CEPH_CAP_FILE_CACHE|
+                                       CEPH_CAP_FILE_LAZYIO)) {
                                dout("check_caps queuing invalidate\n");
                                queue_invalidate = 1;
                                ci->i_rdcache_revoking = ci->i_rdcache_gen;
@@ -2139,7 +2162,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
        else if (flushsnaps)
                ceph_flush_snaps(ci);
        if (wake)
-               wake_up(&ci->i_cap_wq);
+               wake_up_all(&ci->i_cap_wq);
        if (put)
                iput(inode);
 }
@@ -2215,7 +2238,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
                iput(inode);
        } else if (complete_capsnap) {
                ceph_flush_snaps(ci);
-               wake_up(&ci->i_cap_wq);
+               wake_up_all(&ci->i_cap_wq);
        }
        if (drop_capsnap)
                iput(inode);
@@ -2236,8 +2259,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
                             struct ceph_mds_session *session,
                             struct ceph_cap *cap,
                             struct ceph_buffer *xattr_buf)
-       __releases(inode->i_lock)
-       __releases(session->s_mutex)
+               __releases(inode->i_lock)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        int mds = session->s_mds;
@@ -2264,6 +2286,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
         * will invalidate _after_ writeback.)
         */
        if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
+           (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
            !ci->i_wrbuffer_ref) {
                if (try_nonblocking_invalidate(inode) == 0) {
                        revoked_rdcache = 1;
@@ -2355,15 +2378,22 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
 
        /* revocation, grant, or no-op? */
        if (cap->issued & ~newcaps) {
-               dout("revocation: %s -> %s\n", ceph_cap_string(cap->issued),
-                    ceph_cap_string(newcaps));
-               if ((used & ~newcaps) & CEPH_CAP_FILE_BUFFER)
-                       writeback = 1; /* will delay ack */
-               else if (dirty & ~newcaps)
-                       check_caps = 1;  /* initiate writeback in check_caps */
-               else if (((used & ~newcaps) & CEPH_CAP_FILE_CACHE) == 0 ||
-                          revoked_rdcache)
-                       check_caps = 2;     /* send revoke ack in check_caps */
+               int revoking = cap->issued & ~newcaps;
+
+               dout("revocation: %s -> %s (revoking %s)\n",
+                    ceph_cap_string(cap->issued),
+                    ceph_cap_string(newcaps),
+                    ceph_cap_string(revoking));
+               if (revoking & used & CEPH_CAP_FILE_BUFFER)
+                       writeback = 1;  /* initiate writeback; will delay ack */
+               else if (revoking == CEPH_CAP_FILE_CACHE &&
+                        (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
+                        queue_invalidate)
+                       ; /* do nothing yet, invalidation will be queued */
+               else if (cap == ci->i_auth_cap)
+                       check_caps = 1; /* check auth cap only */
+               else
+                       check_caps = 2; /* check all caps */
                cap->issued = newcaps;
                cap->implemented |= newcaps;
        } else if (cap->issued == newcaps) {
@@ -2391,7 +2421,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
        if (queue_invalidate)
                ceph_queue_invalidate(inode);
        if (wake)
-               wake_up(&ci->i_cap_wq);
+               wake_up_all(&ci->i_cap_wq);
 
        if (check_caps == 1)
                ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY,
@@ -2446,7 +2476,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
                                         struct ceph_inode_info,
                                         i_flushing_item)->vfs_inode);
                mdsc->num_cap_flushing--;
-               wake_up(&mdsc->cap_flushing_wq);
+               wake_up_all(&mdsc->cap_flushing_wq);
                dout(" inode %p now !flushing\n", inode);
 
                if (ci->i_dirty_caps == 0) {
@@ -2458,7 +2488,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
                }
        }
        spin_unlock(&mdsc->cap_dirty_lock);
-       wake_up(&ci->i_cap_wq);
+       wake_up_all(&ci->i_cap_wq);
 
 out:
        spin_unlock(&inode->i_lock);
@@ -2554,7 +2584,8 @@ static void handle_cap_trunc(struct inode *inode,
  * caller holds s_mutex
  */
 static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
-                             struct ceph_mds_session *session)
+                             struct ceph_mds_session *session,
+                             int *open_target_sessions)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        int mds = session->s_mds;
@@ -2586,6 +2617,12 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
                        ci->i_cap_exporting_mds = mds;
                        ci->i_cap_exporting_mseq = mseq;
                        ci->i_cap_exporting_issued = cap->issued;
+
+                       /*
+                        * make sure we have open sessions with all possible
+                        * export targets, so that we get the matching IMPORT
+                        */
+                       *open_target_sessions = 1;
                }
                __ceph_remove_cap(cap);
        }
@@ -2655,12 +2692,16 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        struct ceph_mds_caps *h;
        int mds = session->s_mds;
        int op;
-       u32 seq;
+       u32 seq, mseq;
        struct ceph_vino vino;
        u64 cap_id;
        u64 size, max_size;
        u64 tid;
        void *snaptrace;
+       size_t snaptrace_len;
+       void *flock;
+       u32 flock_len;
+       int open_target_sessions = 0;
 
        dout("handle_caps from mds%d\n", mds);
 
@@ -2669,15 +2710,30 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        if (msg->front.iov_len < sizeof(*h))
                goto bad;
        h = msg->front.iov_base;
-       snaptrace = h + 1;
        op = le32_to_cpu(h->op);
        vino.ino = le64_to_cpu(h->ino);
        vino.snap = CEPH_NOSNAP;
        cap_id = le64_to_cpu(h->cap_id);
        seq = le32_to_cpu(h->seq);
+       mseq = le32_to_cpu(h->migrate_seq);
        size = le64_to_cpu(h->size);
        max_size = le64_to_cpu(h->max_size);
 
+       snaptrace = h + 1;
+       snaptrace_len = le32_to_cpu(h->snap_trace_len);
+
+       if (le16_to_cpu(msg->hdr.version) >= 2) {
+               void *p, *end;
+
+               p = snaptrace + snaptrace_len;
+               end = msg->front.iov_base + msg->front.iov_len;
+               ceph_decode_32_safe(&p, end, flock_len, bad);
+               flock = p;
+       } else {
+               flock = NULL;
+               flock_len = 0;
+       }
+
        mutex_lock(&session->s_mutex);
        session->s_seq++;
        dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
@@ -2689,6 +2745,18 @@ void ceph_handle_caps(struct ceph_mds_session *session,
             vino.snap, inode);
        if (!inode) {
                dout(" i don't have ino %llx\n", vino.ino);
+
+               if (op == CEPH_CAP_OP_IMPORT)
+                       __queue_cap_release(session, vino.ino, cap_id,
+                                           mseq, seq);
+
+               /*
+                * send any full release message to try to move things
+                * along for the mds (who clearly thinks we still have this
+                * cap).
+                */
+               ceph_add_cap_releases(mdsc, session);
+               ceph_send_cap_releases(mdsc, session);
                goto done;
        }
 
@@ -2699,12 +2767,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
                goto done;
 
        case CEPH_CAP_OP_EXPORT:
-               handle_cap_export(inode, h, session);
+               handle_cap_export(inode, h, session, &open_target_sessions);
                goto done;
 
        case CEPH_CAP_OP_IMPORT:
                handle_cap_import(mdsc, inode, h, session,
-                                 snaptrace, le32_to_cpu(h->snap_trace_len));
+                                 snaptrace, snaptrace_len);
                ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY,
                                session);
                goto done_unlocked;
@@ -2714,7 +2782,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        spin_lock(&inode->i_lock);
        cap = __get_cap_for_mds(ceph_inode(inode), mds);
        if (!cap) {
-               dout("no cap on %p ino %llx.%llx from mds%d, releasing\n",
+               dout(" no cap on %p ino %llx.%llx from mds%d\n",
                     inode, ceph_ino(inode), ceph_snap(inode), mds);
                spin_unlock(&inode->i_lock);
                goto done;
@@ -2746,6 +2814,8 @@ done:
 done_unlocked:
        if (inode)
                iput(inode);
+       if (open_target_sessions)
+               ceph_mdsc_open_export_target_sessions(mdsc, session);
        return;
 
 bad:
@@ -2865,18 +2935,19 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_cap *cap;
        struct ceph_mds_request_release *rel = *p;
+       int used, dirty;
        int ret = 0;
-       int used = 0;
 
        spin_lock(&inode->i_lock);
        used = __ceph_caps_used(ci);
+       dirty = __ceph_caps_dirty(ci);
 
-       dout("encode_inode_release %p mds%d used %s drop %s unless %s\n", inode,
-            mds, ceph_cap_string(used), ceph_cap_string(drop),
+       dout("encode_inode_release %p mds%d used|dirty %s drop %s unless %s\n",
+            inode, mds, ceph_cap_string(used|dirty), ceph_cap_string(drop),
             ceph_cap_string(unless));
 
-       /* only drop unused caps */
-       drop &= ~used;
+       /* only drop unused, clean caps */
+       drop &= ~(used | dirty);
 
        cap = __get_cap_for_mds(ci, mds);
        if (cap && __cap_is_valid(cap)) {
@@ -2956,6 +3027,7 @@ int ceph_encode_dentry_release(void **p, struct dentry *dentry,
                memcpy(*p, dentry->d_name.name, dentry->d_name.len);
                *p += dentry->d_name.len;
                rel->dname_seq = cpu_to_le32(di->lease_seq);
+               __ceph_mdsc_drop_dentry_lease(dentry);
        }
        spin_unlock(&dentry->d_lock);
        return ret;