J.U.C剖析與解讀1(Lock的實現)

J.U.C剖析與解讀1(Lock的實現)

前言

為了節省各位的時間,我簡單介紹一下這篇文章。這篇文章主要分為三塊:Lock的實現,AQS的由來(通過演變的方式),JUC三大工具類的使用與原理剖析。

  • Lock的實現:簡單介紹ReentrantLock,ReentrantReadWriteLock兩種JUC下經典Lock的實現,並通過手寫簡化版的ReentrantLock和ReentrantReadWriteLock,從而了解其實現原理。

  • AQS的由來:通過對兩個簡化版Lock的多次迭代,從而獲得AQS。並且最終的Lock實現了J.U.C下Lock接口,既可以使用我們演變出來的AQS,也可以對接JUC下的AQS。這樣一方面可以幫助大家理解AQS,另一方面大家可以從中了解,如何利用AQS實現自定義Lock。而這兒,對後續JUC下的三大Lock工具的理解有非常大的幫助。

  • JUC三大工具:經過前兩個部分的學習,這個部分不要太easy。可以很容易地理解CountDownLatch,Semaphore,CyclicBarrier的內部運行及實現原理。

不過,由於這三塊內容較多,所以我將它拆分為三篇子文章進行論述。

一,介紹

Lock

Lock接口位於J.U.C下locks包內,其定義了Lock應該具備的方法。

Lock 方法簽名:

  • void lock():獲取鎖(不死不休,拿不到就一直等)
  • boolean tryLock():獲取鎖(淺嘗輒止,拿不到就算了)
  • boolean tryLock(long time, TimeUnit unit) throws InterruptedException:獲取鎖(過時不候,在一定時間內拿不到鎖,就算了)
  • void lockInterruptibly() throws InterruptedException:獲取鎖(任人擺布,xxx)
  • void unlock():釋放鎖
  • Condition newCondition():獲得Condition對象

ReentrantLock

簡介

ReentrantLock是一個可重入鎖,一個悲觀鎖,默認是非公平鎖(但是可以通過Constructor設置為公平鎖)。

Lock應用

ReentrantLock通過構造方法獲得lock對象。利用lock.lock()方法對當前線程進行加鎖操作,利用lock.unlock()方法對當前線程進行釋放鎖操作。

Condition應用

通過


    ReentrantLock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

獲得Condition對象(Condition是J.U.C下locks包下的接口)。

通過Condition對象的.await(*),可以將當前線程的線程狀態切換到Waiting狀態(如果是有參,則是Time Waiting狀態)。而.signal(),.signalAll()等方法則正好相反,恢複線程狀態為Runnable狀態。

ReentrantReadWriteLock

簡介

ReentrantLock和Synchronized功能類似,更加靈活,當然,也更加手動了。

大家都知道,只有涉及資源的競爭時,採用同步的必要。寫操作自然屬於資源的競爭,但是讀操作並不屬於資源的競爭行為。簡單說,就是寫操作最多只能一個線程(因為寫操作涉及數據改變,多個線程同時寫,會產生資源同步問題),而讀操作可以有多個(因為不涉及數據改變)。

所以在讀多寫少的場景下,ReentrantLock就比較浪費資源了。這就需要一種能夠區分讀寫操作的鎖,那就是ReentrantReadWriteLock。通過ReentrantReadWriteLock,可以獲得讀鎖與寫鎖。當寫鎖存在時,有且只能有一個線程持有鎖。當寫鎖不存在時,可以有多個線程持有讀鎖(寫鎖,必須等待讀鎖釋放完,才可以持有鎖)。

Lock及Condition應用


        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

        ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
        readLock.lock();
        readLock.unlock();

        readLock.newCondition();

        ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
        writeLock.lock();
        writeLock.unlock();

        writeLock.newCondition();

與之前ReentrantLock應用的區別,就是需要通過lock.readLock()與lock.writeLock()來獲取讀鎖,寫鎖,再進行加鎖,釋放鎖的操作,以及Condition的獲取操作。

二,手寫ReentrantLock

獲取需求

終於上大餐了。

首先第一步操作,我們需要確定我們要做什麼。

我們要做一個鎖,這裏姑且命名為JarryReentrantLock。

這個鎖,需要具備以下特性:可重入鎖,悲觀鎖。

另外,為了更加規範,以後更好地融入到AQS中,該鎖需要實現Lock接口。

而Lock的方法簽名,在文章一開始,就已經寫了,這裏不再贅述。

當然,我們這裏只是一個demo,所以就不實現Condition了。另外tryLock(long,TimeUnit)也不再實現,因為實現了整體后,這個實現其實並沒有想象中那麼困難。

JarryReentrantLock實現原理

既然需要已經確定,並且API也確定了。

那麼第二步操作,就是簡單思考一下,如何實現。

類成員方面:

  1. 首先,我們需要一個owner屬性,來保存持有鎖的線程對象。

  2. 其次,由於是可重入鎖,所以我們需要一個count來保存重入次數。

  3. 最後,我們需要一個waiters屬性,來保存那些競爭鎖失敗后,還在等待(不死不休型)的線程對象。

類方法方面:

  • tryLock:嘗試獲取鎖,成功返回true,失敗返回false。首先是獲取鎖的行為,可以通過CAS操作實現,或者更簡單一些,通過Atomic包實現(其底層也還是CAS)。另外,由於是可重入鎖,所以在嘗試獲取鎖時,需要判斷嘗試獲取鎖的線程是否為當前鎖的持有者線程。
  • lock:嘗試獲取鎖,直到成功獲得鎖。看到這種不成功便成仁的精神,我第一個想法是循環調用tryLock。但是這實在太浪費資源了(畢竟長時間的忙循環是非常消耗CPU資源的)。所以就是手動通過LockSupport.park()將當前線程掛起,然後置入等待隊列waiters中,直到釋放鎖操作來調用。
  • tryUnlock:嘗試解鎖,成功返回true,失敗返回false。首先就是在釋放鎖前,需要判斷嘗試解鎖的線程與鎖的持有者是否為同一個線程(總不能線程A把線程B持有的鎖給釋放了吧)。其次,需要判斷可重入次數count是否為0,從而決定是否將鎖的持有owner設置為null。最後,就是為了避免在count=0時,其他線程同時進行加鎖操作,造成的count>0,owner=null的情況,所以count必須是Atomic,並此處必須採用CAS操作(這裡有些難理解,可以看代碼,有相關註釋)。
  • unlock:解鎖操作。這裏嘗試進行解鎖,如果解鎖成功,需要從等待隊列waiters中喚醒一個線程(喚醒后的線程,由於在循環中,所以會繼續進行競爭鎖操作。但是切記該線程不一定競爭鎖成功,因為可能有新來的線程,搶先一步。那麼該線程會重新進入隊列。所以,此時的JarryReentrantLock只支持不公平鎖)。

JarryReentrantLock實現

那麼接下來,就根據之前的信息,進行編碼吧。


    package tech.jarry.learning.netease;
    
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicReference;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.LockSupport;
    
    /**
     * @Description: 仿ReentrantLock,實現其基本功能及特性
     * @Author: jarry
     */
    public class JarryReentrantLock implements Lock {
    
        // 加鎖計數器
        private AtomicInteger count = new AtomicInteger(0);
        // 鎖持有者
        private AtomicReference<Thread> owner = new AtomicReference<>();
        // 等待池
        private LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
    
    
        @Override
        public boolean tryLock() {
            // 判斷當前count是否為0
            int countValue = count.get();
            if (countValue != 0){
                // countValue不為0,意味着鎖被線程持有
                // 進而判斷鎖的持有者owner是否為當前線程
                if (Thread.currentThread() == owner.get()){
                    // 鎖的持有者為當前線程,那麼就重入加鎖
                    // 既然鎖已經被當前線程佔有,那麼就不用擔心count被其他線程修改,即不需要使用CAS
                    count.set(countValue+1);
                    // 執行重入鎖,表示當前線程獲得了鎖
                    return true;
                }else{
                    // 如果當前線程不是鎖的持有者,返回false(該方法是tryLock,即淺嘗輒止)
                    return false;
                }
            }else {
                // countValue為0,意味着當前鎖不被任何線程持有
                // 通過CAS操作將count修改為1
                if (count.compareAndSet(countValue,countValue+1)){
                    // count修改成功,意味着該線程獲得了鎖(只有一個CAS成功修改count,那麼這個CAS的線程就是鎖的持有者)
                    // 至於這裏為什麼不用擔心可見性,其實一開始我也比較擔心其發生類似doubleCheck中重排序造成的問題(tryUnlock是會設置null的)
                    // 看了下源碼,AtomicReference中的value是volatile的
                    owner.set(Thread.currentThread());
                    return true;
                } else {
                    // CAS操作失敗,表示當前線程沒有成功修改count,即獲取鎖失敗
                    return false;
                }
            }
        }
    
        @Override
        public void lock() {
            // lock()【不死不休型】就等於執行tryLock()失敗后,仍然不斷嘗試獲取鎖
            if (!tryLock()){
                // 嘗試獲取鎖失敗后,就只能進入等待隊列waiers,等待機會,繼續tryLock()
                waiters.offer(Thread.currentThread());
    
                // 通過自旋,不斷嘗試獲取鎖
                // 其實我一開始也不是很理解為什麼這樣寫,就可以確保每個執行lock()的線程就在一直競爭鎖。其實,想一想執行lock()的線程都有這個循環。
                // 每次unlock,都會將等待隊列的頭部喚醒(unpark),那麼處在等待隊列頭部的線程就會繼續嘗試獲取鎖,等待隊列的其它線程仍然,繼續阻塞(park)
                // 這也是為什麼需要在循環體中執行一個檢測當前線程是否為等待隊列頭元素等一系列操作。
                // 另外,還有就是:處於等待狀態的線程可能收到錯誤警報和偽喚醒,如果不在循環中檢測等待條件,程序就會在沒有滿足結束條件的情況下退出。反正最後無論那個分支,都return,結束方法了。
                // 即使沒有偽喚醒問題,while還是需要的,因為線程需要二次嘗試獲得鎖
                while (true){
                    // 獲取等待隊列waiters的頭元素(peek表示獲取頭元素,但不刪除。poll表示獲取頭元素,並刪除其在隊列中的位置)
                    Thread head = waiters.peek();
                    // 如果當前線程就是等待隊列中的頭元素head,說明當前等待隊列就剛剛加入的元素。
                    if (head == Thread.currentThread()){
                        // 嘗試再次獲得鎖
                        if (!tryLock()){
                            // 再次嘗試獲取鎖失敗,即將該線程(即當前線程)掛起,
                            LockSupport.park();
                        } else {
                            // 獲取鎖成功,即將該線程(等待隊列的頭元素)從等待隊列waiters中移除
                            waiters.poll();
                            return;
                        }
                    } else {
                        // 如果等待隊列的頭元素head,不是當前線程,表示等待隊列在當前線程加入前,就還有別的線程在等待
                        LockSupport.park();
                    }
                }
            }
        }
    
        private boolean tryUnlock() {
            // 首先確定當前線程是否為鎖持有者
            if (Thread.currentThread() != owner.get()){
                // 如果當前線程不是鎖的持有者,就拋出一個異常
                throw new IllegalMonitorStateException();
            } else {
                // 如果當前線程是鎖的持有者,就先count-1
                // 另外,同一時間執行解鎖的只可能是鎖的持有者線程,故不用擔心原子性問題(原子性問題只有在多線程情況下討論,才有意義)
                int countValue = count.get();
                int countNextValue = countValue - 1;
                count.compareAndSet(countValue,countNextValue);
                if (countNextValue == 0){
                    // 如果當前count為0,意味着鎖的持有者已經完全解鎖成功,故應當失去鎖的持有(即設置owner為null)
                    // 其實我一開始挺糾結的,這裏為什麼需要使用CAS操作呢。反正只有當前線程才可以走到程序這裏。
                    // 首先,為什麼使用CAS。由於count已經設置為0,其它線程已經可以修改count,修改owner了。所以不用CAS就可能將owner=otherThread設置為owner=null了,最終的結果就是徹底卡死
                    //TODO_FINISHED 但是unlock()中的unpark未執行,根本就不會有其它線程啊。囧
                    // 這裏代碼還是為了體現源碼的一些特性。實際源碼是將這些所的特性,抽象到了更高的層次,形成一個AQS。
                    // 雖然tryUnlock是由實現子類實現,但countNextValue是來自countValue(而放在JarryReadWriteLock中就是writeCount),在AQS源碼中,則是通過state實現
    
                    // 其次,有沒有ABA問題。由於ABA需要將CAS的expect值修改為currentThread,而當前線程只能單線程執行,所以不會。
                    // 最後,這裏owner設置為null的操作到底需不需要。實際源碼可能是需要的,但是這裏貌似真的不需要。
                    owner.compareAndSet(Thread.currentThread(),null);
                    // 解鎖成功
                    return true;
                } else {
                    // count不為0,解鎖尚未完全完成
                    return false;
                }
            }
        }
    
        @Override
        public void unlock() {
            if (tryUnlock()){
                // 如果當前線程成功tryUnlock,就表示當前鎖被空置出來了。那就需要從備胎中,啊呸,從waiters中“放“出來一個
                Thread head = waiters.peek();
                // 這裏需要做一個簡單的判斷,防止waiters為空時,拋出異常
                if (head != null){
                    LockSupport.unpark(head);
                }
            }
        }
    
    
        // 非核心功能就不實現了,起碼現在不實現了。
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
    
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
        @Override
        public Condition newCondition() {
            return null;
        }
    }

這裏就不進行一些解釋了。因為需要的解釋,在註釋中都寫的很明確了,包括我踩的一些坑。

如果依舊有一些看不懂的地方,或者錯誤的地方,歡迎@我,或者私信我。

三,手寫ReentrantReadWriteLock

獲取需求

與ReentrantLock一樣,首先第一步操作,我們需要確定我們要做什麼。

我們要做一個鎖,這裏姑且命名為JarryReadWriteLock。

這個鎖,需要具備以下特性:讀寫鎖,可重入鎖,悲觀鎖。

一方面了為了更好理解(第一版本,重在理解基礎,不是嘛),另一方面也是為了更好地復用前面ReentrantLock的代碼(畢竟ReentrantLock其實就是讀寫鎖的寫鎖,不是嘛),這裏的JarryReadWriteLock的API不再與官方的ReentrantReadWriteLock相同,而是做了小小調整。直接調用相關讀鎖的加解鎖API,已經相關寫鎖的加解鎖API。具體看代碼部分。

JarryReadWriteLock實現原理

既然需要已經確定,並且API也確定了。

那麼第二步操作,就是簡單思考一下,如何實現。

類成員方面:

  1. 首先,我們需要一個owner屬性,來保存持有寫鎖的線程對象。

  2. 其次,由於寫鎖是可重入鎖,所以我們需要一個readCount來保存重入次數。

  3. 然後,由於讀鎖是可以有多個線程持有的,所以我們需要一個writeCount來保存讀鎖持有線程數。

  4. 最後,我們需要一個waiters屬性,來保存那些競爭鎖失敗后,還在等待(不死不休型)的線程對象。

自定義數據結構:

到這這裏,就不禁會有一個疑問。如何判斷嘗試獲取鎖的線程想要獲得的鎖是什麼類型的鎖。在API調用階段,我們可以根據API判斷。但是放入等待隊列后,我們如何判斷呢?如果還是如之前那樣,等待隊列只是保存競爭鎖的線程對象,是完全不夠的。

所以我們需要新建一個WaitNode的Class,用來保存等待隊列中線程對象及相關必要信息。所以,WaitNode會有如下屬性:

  • Thread thread:標識該等待者的線程。
  • int type:標識該線程對象希望競爭的鎖的類型。0表示寫鎖(獨佔鎖),1表示讀鎖(共享鎖)。
  • int arg:擴展參數。其實在手寫的簡易版,看不出來價值。但是實際AQS中的Node就是類似設計。不過AQS中,並不是採用queue保存Node,而是通過一個鏈表的方式保存Node。

類方法方面:

  • 獨佔鎖:
    • tryLock:與JarryReentrantLock類似,不過增加了兩點。一方面需要考量共享鎖是否被佔用。另一方面需要引入acquire參數(目前是固定值),呼應WaitNode的arg。
    • lock:與JarryReentrantLock類似,不過需要手動設置arg。
    • tryUnlock:與JarryReentrantLock類似,同樣需要引入release參數(目前是固定值),呼應WaitNode的arg。
    • unlock:與JarryReentrantLock類似,不過需要手動設置arg。
  • 共享鎖:
    • tryLockShared:嘗試獲取共享鎖,成功返回true,失敗返回false。其實和獨佔鎖的tryLock類似,只不過需要額外考慮獨佔鎖是否已經存在。另外為了實現鎖降級,如果獨佔鎖存在,需要判斷獨佔鎖的持有者與當前嘗試獲得共享鎖的線程是否一致。
    • lockShared:獲取共享鎖,直到成功。由於已經有了WaitNode.type,用於判斷鎖類型,所以共享鎖與獨佔鎖使用的是同一隊列。同樣的,這裏需要手動設置arg。其它方面與獨佔鎖的lock操作基本一致。
    • tryUnlockShared:嘗試釋放鎖,成功返回true,失敗返回false。類似於tryUnlock,只不過增加了release參數(固定值),呼應WaitNode的arg。
    • unlockShared:釋放鎖。類似unlock,不過需要手動設置arg。

JarryReentrantLock實現


    package tech.jarry.learning.netease;
    
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicReference;
    import java.util.concurrent.locks.LockSupport;
    
    /**
     * @Description:
     * @Author: jarry
     */
    public class JarryReadWriteLock {
    
        // 用於讀鎖(共享鎖)的鎖計數器   這裏真的有必要volatile嘛(Atomic中的value時volatile的),再看看後續代碼
        // 這裏確實不需要volatile,至於源碼,更過分,源碼是通過一個變量state的位運算實現readCount與writeCount
        volatile AtomicInteger readCount = new AtomicInteger(0);
        // 用於寫鎖(獨佔鎖)的鎖計數器   這裏之所以不用volatile是因為獨佔鎖,只有一個線程在改變writeCount(即使有緩存,也還是這個線程,所以不會因為緩存問題,導致問題)
        AtomicInteger writeCount = new AtomicInteger(0);
        // 用於保存鎖的持有者(這裏專指寫鎖(獨佔鎖)的鎖持有者)
        AtomicReference<Thread> owner = new AtomicReference<>();
        // 用於保存期望獲得鎖的線程(為了區分線程希望獲得的鎖的類型,這裏新建一個新的數據類型(通過內部類實現))
        public volatile LinkedBlockingQueue<WaitNode> waiters = new LinkedBlockingQueue<>();
    
        // 內部類實現等待隊列中的自定義數據類型
        class WaitNode{
            // 表示該等待者的線程
            Thread thread = null;
            // 表示希望爭取的鎖的類型。0表示寫鎖(獨佔鎖),1表示讀鎖(共享鎖)
            int type = 0;
            // 參數,acquire,狀態相關,再看看
            int arg = 0;
    
            public WaitNode(Thread thread, int type, int arg) {
                this.type = type;
                this.thread = thread;
                this.arg = arg;
            }
        }
    
        /**
         * 嘗試獲取獨佔鎖(針對獨佔鎖)
         * @param acquires 用於加鎖次數。一般傳入waitNode.arg(本代碼中就是1。為什麼不用一個常量1,就不知道了?)(可以更好的對接AQS)
         * @return
         */
        public boolean tryLock(int acquires){
            //TODO_FINISHED 這裏readCount的判斷,與修改writeCount的操作可以被割裂,並不是原子性的。不就有可能出現readCount與writeCount的值同時大於零的情況。
            // 該示例代碼,確實存在該問題,但實際源碼,writeCount與readCount是通過同一變量state實現的,所以可以很好地通過CAS確保原子性
    
            // readCount表示讀鎖(共享鎖)的上鎖次數
            if (readCount.get() == 0){
                // readCount的值為0,表示讀鎖(共享鎖)空置,所以當前線程是有可能獲得寫鎖(獨佔鎖)。
                // 接下來判斷寫鎖(獨佔鎖)是否被佔用
                int writeCountValue = writeCount.get();
                if (writeCountValue == 0){
                    // 寫鎖(獨佔鎖)的鎖次數為0,表示寫鎖(獨佔鎖)並沒未被任何線程持有
                    if (writeCount.compareAndSet(writeCountValue,writeCountValue+acquires)){
                        // 修改writeCount,來獲得鎖。該機制與ReentrantLock相同
                        // 設置獨享鎖的持有者owner
                        owner.set(Thread.currentThread());
                        // 至此,表示當前線程搶鎖成功
                        return true;
                    }
                } else {
                    // 寫鎖(獨佔鎖)的鎖次數不為0,表示寫鎖(獨佔鎖)已經被某線程持有
                    if (Thread.currentThread() == owner.get()){
                        // 如果持有鎖的線程為當前線程,那就進行鎖的重入操作
                        writeCount.set(writeCountValue+acquires);
                        // 重入鎖,表示當前線程是持有鎖的
                        return true;
                    }
                    // 讀鎖未被佔用,但寫鎖被佔用,且佔據寫鎖的線程不是當前線程
                }
            }
            // 讀鎖被佔據
            // 其它情況(1.讀鎖被佔據,2讀鎖未被佔用,但寫鎖被佔用,且佔據寫鎖的線程不是當前線程),都返回false
            return false;
        }
    
        /**
         * 獲取獨佔鎖(針對獨佔鎖)
         */
        public void lock(){
            // 設定waitNote中arg參數
            int arg = 1;
            // 嘗試獲取獨佔鎖。成功便退出方法,失敗,則進入“不死不休”邏輯
            if (!tryLock(arg)){
                // 需要將當前保存至等待隊列,在這之前,需要封裝當前線程為waitNote
                WaitNode waitNode = new WaitNode(Thread.currentThread(), 0, arg);
                // 將封裝好的waitNode放入等待隊列waiters中(offer方法會在隊列滿時,直接返回false。put則是阻塞。add則是拋出異常)
                waiters.offer(waitNode);
    
                // 如ReentrantLock一般,開始循環嘗試拿鎖
                while (true){
                    // 獲取隊列頭部元素
                    WaitNode headNote = waiters.peek();
                    // 如果等待隊列頭部元素headNote不為null(有可能是null嘛?),並且就是當前線程,那就嘗試獲取鎖
                    if (headNote !=null && headNote.thread == Thread.currentThread()){
                        // 如果再次嘗試獲取鎖失敗,那就只能掛起了
                        if (!tryLock(headNote.arg)){
                            LockSupport.park();
                        } else {
                            // 再次嘗試獲取鎖成功,那就將隊列頭部元素,踢出等待隊列waiters
                            waiters.poll();
                            return;
                        }
                    }else {
                        // 如果headNote不是當前線程的封裝,就直接掛起(這裏就沒處理headNote==null的情況)
                        LockSupport.park();
                    }
                }
            }
        }
    
        /**
         * 嘗試解鎖(針對獨佔鎖)
         * @param releases 用於設定解鎖次數。一般傳入waitNode.arg
         * @return
         */
        public boolean tryUnlock(int releases){
            // 首先判斷鎖的持有者是否為當前線程
            if (owner.get() != Thread.currentThread()){
                // 鎖的持有者不是當前線程(即使鎖的持有者為null,鎖的持有者是null,還解鎖,仍然是拋出異常)
                throw new IllegalMonitorStateException();
            }
            // 鎖的持有者就是當前線程
            // 首先按照releases進行解鎖(經過一番思考後,這裏不會出現類似DoubleCheck中的問題(Atomic中的value是volatile的),所以這個值同時只會有一個線程對其操作)
            int writeCountValue = writeCount.get();
            // 為writeCount設置新值
            writeCount.set(writeCountValue-releases);
            // 根據writeCount的新值,判斷鎖的持有者是否發生變化
            if (writeCount.get() == 0){
                // writeCount的值為0,表示當前線程已經完全解鎖,所以修改鎖的持有者為null
                owner.set(null);
                // 而這表示完全解鎖成功
                return true;
            } else {
                // writeCount的值不為0,表示當前線程尚未完全解鎖,故鎖的持有者未發生變化。即嘗試解鎖失敗
                return false;
            }
        }
    
        /**
         * 解鎖(針對獨佔鎖)
         */
        public void unlock(){
            // 設定tryUnlock的參數releases
            int arg = 1;
            // 先嘗試解鎖
            if (tryUnlock(arg)){
                // 獲得等待隊列的頭部元素
                WaitNode head = waiters.peek();
                // 檢測一下頭部元素head是否null(也許等待隊列根本就沒有元素)
                if (head == null){
                    // 如果頭部元素head為null,說明隊列為null,直接return
                    return;
                }
                // 解鎖成功,就要把等待隊列中的頭部元素喚醒(unpark)
                // 這裡有一點注意,即使隊列的頭元素head被喚醒了,也不一定就是這個頭元素head獲得鎖(詳見tryLock,新來的線程可能獲得鎖)
                // 如果這個頭元素無法獲得鎖,就會park(while循環嘛)。並且一次park,可以多次unpark(已實踐)
                LockSupport.unpark(head.thread);
            }
        }
    
        /**
         * 嘗試獲取共享鎖(針對共享鎖)
         * @param acquires
         * @return
         */
        public boolean tryLockShared(int acquires){
            // 判斷寫鎖(獨佔鎖)是否被別的線程持有(這個條件意味着:同一個線程可以同時持有讀鎖與寫鎖)
            // 該方法是為了進行  鎖降級******
            if (writeCount.get() == 0 || owner.get() == Thread.currentThread()){
                // 如果寫鎖(獨佔鎖)沒有別的被線程持有,就可以繼續嘗試獲取讀鎖(共享鎖)
                // 通過循環實現自旋,從而實現加鎖(避免加鎖失敗)
                while(true){
                    // 由於讀鎖(共享鎖)是共享的,不存在獨佔行為,故直接在writeCount增加當前線程加鎖行為的次數acquires
                    int writeCountValue = writeCount.get();
                    // 通過CAS進行共享鎖的次數的增加
                    if (writeCount.compareAndSet(writeCountValue, writeCountValue+acquires)){
                        break;
                    }
                }
            }
            // 寫鎖已經被別的線程持有,共享鎖獲取失敗
            return false;
        }
    
        /**
         * 獲取共享鎖(針對共享鎖)
         */
        public void lockShared(){
            // 設定waitNote中arg參數
            int arg = 1;
            // 判斷是否獲取共享鎖成功
            if (!tryLockShared(arg)){
                // 如果獲取共享鎖失敗,就進入等待隊列
                // 與獲取同步鎖操作一樣的,需要先對當前線程進行WaitNote的封裝
                WaitNode waitNode = new WaitNode(Thread.currentThread(),1,arg);
                // 將waitNote置入waiters(offer方法會在隊列滿時,直接返回false。put則是阻塞。add則是拋出異常)
                waiters.offer(waitNode);
    
                // 使用循環。一方面避免偽喚醒,另一方面便於二次嘗試獲取鎖
                while (true){
                    // 獲取等待隊列waiters的頭元素head
                    WaitNode head = waiters.peek();
                    // 校驗head是否為null,並判斷等待隊列的頭元素head是否為當前線程的封裝(也許head時當前線程的封裝,但並不意味着head就是剛剛放入waiters的元素)
                    if (head != null && head.thread == Thread.currentThread()){
                        // 如果校驗通過,並且等待隊列的頭元素head為當前線程的封裝,就再次嘗試獲取鎖
                        if (tryLockShared(head.arg)){
                            // 獲取共享鎖成功,就從當前隊列中移除head元素(poll()方法移除隊列頭部元素)
                            waiters.poll();
    
                            // 在此處就是與獨佔鎖不同的地方了,獨佔鎖意味着只可能有一個線程獲得鎖,而共享鎖是可以有多個線程獲得的
                            // 獲得等待隊列的新頭元素newHead
                            WaitNode newHead = waiters.peek();
                            // 校驗該元素是否為null,並判斷它的鎖類型是否為共享鎖
                            if (newHead != null && newHead.type == 1){
                                // 如果等待隊列的新頭元素是爭取共享鎖的,那麼就喚醒它(這是一個類似迭代的過程,剛喚醒的線程會會做出同樣的舉動)
                                //TODO_FINISHED 這裡有一點,我有些疑惑,那麼如果等待隊列是這樣的{共享鎖,共享鎖,獨佔鎖,共享鎖,共享鎖},共享鎖們被一個獨佔鎖隔開了。是不是就不能喚醒後面的共享鎖了。再看看後面的代碼
                                // 這個實際源碼,並不是這樣的。老師表示現有代碼是這樣的,不用理解那麼深入,後續有機會看看源碼
                                LockSupport.unpark(newHead.thread);
                            }
                        } else {
                            // 如果再次獲取共享鎖失敗,就掛起
                            LockSupport.park();
                        }
                    } else {
                        // 如果校驗未通過,或等待隊列的頭元素head不是當前線程的封裝,就掛起當前線程
                        LockSupport.park();
                    }
                }
            }
        }
    
        /**
         * 嘗試解鎖(針對共享鎖)
         * @param releases
         * @return
         */
        public boolean tryUnlockShared(int releases){
            // 通過CAS操作,減少共享鎖的鎖次數,即readCount的值(由於是共享鎖,所以是可能多個線程同時減少該值的,故採用CAS)
            while (true){
                // 獲取讀鎖(共享鎖)的值
                int readCountValue = readCount.get();
                int readCountNext = readCountValue - releases;
                // 只有成功修改值,才可以跳出
                if (readCount.compareAndSet(readCountValue,readCountNext)){
                    // 用於表明共享鎖完全解鎖成功
                    return readCountNext == 0;
                }
            }
            // 由於讀鎖沒有owner,所以不用進行有關owner的操作
        }
    
        /**
         * 解鎖(針對共享鎖)
         */
        public boolean unlockShared(){
            // 設定tryUnlockShared的參數releases
            int arg = 1;
            // 判斷是否嘗試解鎖成功
            if (tryUnlockShared(arg)){
                // 如果嘗試解鎖成功,就需要喚醒等待隊列的頭元素head的線程
                WaitNode head = waiters.peek();
                // 校驗head是否為null,畢竟可能等待隊列為null
                if (head != null){
                    // 喚醒等待隊列的頭元素head的線程
                    LockSupport.unpark(head.thread);
                }
                //TODO_FINISHED 嘗試共享鎖解鎖成功后,就應當返回true(雖然有些不大理解作用)
                // 用於對應源碼
                return true;
            }
            //TODO_FINISHED 嘗試共享鎖解鎖失敗后,就應當返回false(雖然有些不大理解作用)
            // 用於對應源碼
            return false;
        }
    }

這裏同樣不進行相關解釋了。因為需要的解釋,在註釋中都寫的很明確了,包括我踩的一些坑。

如果依舊有一些看不懂的地方,或者錯誤的地方,歡迎@我,或者私信我。

四,總結

技術

  • CAS:通過CAS實現鎖持有數量等的原子性操作,從而完成鎖的競爭操作。
  • Atomic:為了簡化操作(避免自己獲取Unsafe,offset等),通過Atomic實現CAS 操作。
  • volatile:為了避免多線程下的可見性問題,採用了volatile的no cache特性。
  • transient:可以避免對應變量序列化,源碼中有採用。不過考慮后,並沒有使用。
  • while:一方面通過while避免偽喚醒問題,另一方面,通過while推動流程(這個需要看代碼)。
  • LinkedBlockingQueue:實現線程等待隊列。實際的AQS是通過Node構成鏈表結構的。
  • LockSupport:通過LockSupport實現線程的掛起,喚醒等操作。
  • IllegalMonitorStateException:就是一個異常類型,仿Synchronized的,起碼看起來更明確,還不用自己實現新的Exception類型。

方案

其實,這兩個demo有兩個重要的方面。一方面是可以親自感受,一個鎖是怎麼實現的,它的方案是怎樣的。另一方面就是去思量,其中有關原子性,以及可見性的思量與設計。

你們可以嘗試改動一些東西,然後去考慮,這樣改動后,是否存在線程安全問題。這樣的考慮對自己在線程安全方面的提升是巨大的。反正我當時那一周,就不斷的改來改去。甚至有些改動,根本調試不出來問題,然後諮詢了別人,才知道其中的一些坑。當然也有一些改動是可以的。

後言

如果有問題,可以@我,或者私信我。

如果覺得這篇文章不錯的話,請點擊推薦。這對我,以及那些需要的人,很重要。

謝謝。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

平板收購,iphone手機收購,二手筆電回收,二手iphone收購-全台皆可收購

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益