]> bbs.cooldavid.org Git - net-next-2.6.git/blobdiff - fs/ocfs2/dlmglue.c
ocfs2: Pass lksbs back from stackglue ast/bast functions.
[net-next-2.6.git] / fs / ocfs2 / dlmglue.c
index c5e4a49e3a1257b48a08abe717236cd713b4dd1f..1ba67df0092469f56d553b9685a272e4c75e86e6 100644 (file)
@@ -297,6 +297,11 @@ static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
                lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
 }
 
+static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(union ocfs2_dlm_lksb *lksb)
+{
+       return container_of(lksb, struct ocfs2_lock_res, l_lksb);
+}
+
 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
 {
        BUG_ON(!ocfs2_is_inode_lock(lockres));
@@ -875,6 +880,14 @@ static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lo
                lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
 
        lockres->l_level = lockres->l_requested;
+
+       /*
+        * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing
+        * the OCFS2_LOCK_BUSY flag to prevent the dc thread from
+        * downconverting the lock before the upconvert has fully completed.
+        */
+       lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
+
        lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
 
        mlog_exit_void();
@@ -907,8 +920,6 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
 
        assert_spin_locked(&lockres->l_lock);
 
-       lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
-
        if (level > lockres->l_blocking) {
                /* only schedule a downconvert if we haven't already scheduled
                 * one that goes low enough to satisfy the level we're
@@ -921,6 +932,9 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
                lockres->l_blocking = level;
        }
 
+       if (needs_downconvert)
+               lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
+
        mlog_exit(needs_downconvert);
        return needs_downconvert;
 }
@@ -1032,9 +1046,9 @@ static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
 }
 
 
-static void ocfs2_blocking_ast(void *opaque, int level)
+static void ocfs2_blocking_ast(union ocfs2_dlm_lksb *lksb, int level)
 {
-       struct ocfs2_lock_res *lockres = opaque;
+       struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
        struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
        int needs_downconvert;
        unsigned long flags;
@@ -1063,9 +1077,9 @@ static void ocfs2_blocking_ast(void *opaque, int level)
        ocfs2_wake_downconvert_thread(osb);
 }
 
-static void ocfs2_locking_ast(void *opaque)
+static void ocfs2_locking_ast(union ocfs2_dlm_lksb *lksb)
 {
-       struct ocfs2_lock_res *lockres = opaque;
+       struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
        struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
        unsigned long flags;
        int status;
@@ -1133,6 +1147,7 @@ static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
        mlog_entry_void();
        spin_lock_irqsave(&lockres->l_lock, flags);
        lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
+       lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
        if (convert)
                lockres->l_action = OCFS2_AST_INVALID;
        else
@@ -1179,8 +1194,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
                             &lockres->l_lksb,
                             dlm_flags,
                             lockres->l_name,
-                            OCFS2_LOCK_ID_MAX_LEN - 1,
-                            lockres);
+                            OCFS2_LOCK_ID_MAX_LEN - 1);
        lockres_clear_pending(lockres, gen, osb);
        if (ret) {
                ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
@@ -1323,13 +1337,13 @@ static int __ocfs2_cluster_lock(struct ocfs2_super *osb,
 again:
        wait = 0;
 
+       spin_lock_irqsave(&lockres->l_lock, flags);
+
        if (catch_signals && signal_pending(current)) {
                ret = -ERESTARTSYS;
-               goto out;
+               goto unlock;
        }
 
-       spin_lock_irqsave(&lockres->l_lock, flags);
-
        mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
                        "Cluster lock called on freeing lockres %s! flags "
                        "0x%lx\n", lockres->l_name, lockres->l_flags);
@@ -1346,6 +1360,25 @@ again:
                goto unlock;
        }
 
+       if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) {
+               /*
+                * We've upconverted. If the lock now has a level we can
+                * work with, we take it. If, however, the lock is not at the
+                * required level, we go thru the full cycle. One way this could
+                * happen is if a process requesting an upconvert to PR is
+                * closely followed by another requesting upconvert to an EX.
+                * If the process requesting EX lands here, we want it to
+                * continue attempting to upconvert and let the process
+                * requesting PR take the lock.
+                * If multiple processes request upconvert to PR, the first one
+                * here will take the lock. The others will have to go thru the
+                * OCFS2_LOCK_BLOCKED check to ensure that there is no pending
+                * downconvert request.
+                */
+               if (level <= lockres->l_level)
+                       goto update_holders;
+       }
+
        if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
            !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
                /* is the lock is currently blocked on behalf of
@@ -1392,8 +1425,7 @@ again:
                                     &lockres->l_lksb,
                                     lkm_flags,
                                     lockres->l_name,
-                                    OCFS2_LOCK_ID_MAX_LEN - 1,
-                                    lockres);
+                                    OCFS2_LOCK_ID_MAX_LEN - 1);
                lockres_clear_pending(lockres, gen, osb);
                if (ret) {
                        if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
@@ -1416,11 +1448,14 @@ again:
                goto again;
        }
 
+update_holders:
        /* Ok, if we get here then we're good to go. */
        ocfs2_inc_holders(lockres, level);
 
        ret = 0;
 unlock:
+       lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
+
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 out:
        /*
@@ -1827,8 +1862,7 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
        ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
-                            lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
-                            lockres);
+                            lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1);
        if (ret) {
                if (!trylock || (ret != -EAGAIN)) {
                        ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
@@ -3024,9 +3058,9 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
        mlog_exit_void();
 }
 
-static void ocfs2_unlock_ast(void *opaque, int error)
+static void ocfs2_unlock_ast(union ocfs2_dlm_lksb *lksb, int error)
 {
-       struct ocfs2_lock_res *lockres = opaque;
+       struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
        unsigned long flags;
 
        mlog_entry_void();
@@ -3135,8 +3169,7 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,
 
        mlog(0, "lock %s\n", lockres->l_name);
 
-       ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags,
-                              lockres);
+       ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags);
        if (ret) {
                ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
                mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
@@ -3155,7 +3188,7 @@ out:
 /* Mark the lockres as being dropped. It will no longer be
  * queued if blocking, but we still may have to wait on it
  * being dequeued from the downconvert thread before we can consider
- * it safe to drop. 
+ * it safe to drop.
  *
  * You can *not* attempt to call cluster_lock on this lockres anymore. */
 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
@@ -3277,8 +3310,7 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
                             &lockres->l_lksb,
                             dlm_flags,
                             lockres->l_name,
-                            OCFS2_LOCK_ID_MAX_LEN - 1,
-                            lockres);
+                            OCFS2_LOCK_ID_MAX_LEN - 1);
        lockres_clear_pending(lockres, generation, osb);
        if (ret) {
                ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
@@ -3333,7 +3365,7 @@ static int ocfs2_cancel_convert(struct ocfs2_super *osb,
        mlog(0, "lock %s\n", lockres->l_name);
 
        ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
-                              DLM_LKF_CANCEL, lockres);
+                              DLM_LKF_CANCEL);
        if (ret) {
                ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
                ocfs2_recover_from_dlm_error(lockres, 0);
@@ -3352,6 +3384,7 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
        unsigned long flags;
        int blocking;
        int new_level;
+       int level;
        int ret = 0;
        int set_lvb = 0;
        unsigned int gen;
@@ -3360,9 +3393,17 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
 
        spin_lock_irqsave(&lockres->l_lock, flags);
 
-       BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
-
 recheck:
+       /*
+        * Is it still blocking? If not, we have no more work to do.
+        */
+       if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) {
+               BUG_ON(lockres->l_blocking != DLM_LOCK_NL);
+               spin_unlock_irqrestore(&lockres->l_lock, flags);
+               ret = 0;
+               goto leave;
+       }
+
        if (lockres->l_flags & OCFS2_LOCK_BUSY) {
                /* XXX
                 * This is a *big* race.  The OCFS2_LOCK_PENDING flag
@@ -3401,6 +3442,31 @@ recheck:
                goto leave;
        }
 
+       /*
+        * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is
+        * set when the ast is received for an upconvert just before the
+        * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast
+        * on the heels of the ast, we want to delay the downconvert just
+        * enough to allow the up requestor to do its task. Because this
+        * lock is in the blocked queue, the lock will be downconverted
+        * as soon as the requestor is done with the lock.
+        */
+       if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING)
+               goto leave_requeue;
+
+       /*
+        * How can we block and yet be at NL?  We were trying to upconvert
+        * from NL and got canceled.  The code comes back here, and now
+        * we notice and clear BLOCKING.
+        */
+       if (lockres->l_level == DLM_LOCK_NL) {
+               BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders);
+               lockres->l_blocking = DLM_LOCK_NL;
+               lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
+               spin_unlock_irqrestore(&lockres->l_lock, flags);
+               goto leave;
+       }
+
        /* if we're blocking an exclusive and we have *any* holders,
         * then requeue. */
        if ((lockres->l_blocking == DLM_LOCK_EX)
@@ -3438,6 +3504,7 @@ recheck:
         * may sleep, so we save off a copy of what we're blocking as
         * it may change while we're not holding the spin lock. */
        blocking = lockres->l_blocking;
+       level = lockres->l_level;
        spin_unlock_irqrestore(&lockres->l_lock, flags);
 
        ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
@@ -3446,7 +3513,7 @@ recheck:
                goto leave;
 
        spin_lock_irqsave(&lockres->l_lock, flags);
-       if (blocking != lockres->l_blocking) {
+       if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) {
                /* If this changed underneath us, then we can't drop
                 * it just yet. */
                goto recheck;