精美圖文講解Java AQS 共享式獲取同步狀態以及Semaphore的應用

| 好看請贊,養成習慣

  • 你有一個思想,我有一個思想,我們交換后,一個人就有兩個思想

  • If you can NOT explain it simply, you do NOT understand it well enough

現陸續將Demo代碼和技術文章整理在一起 Github實踐精選 ,方便大家閱讀查看,本文同樣收錄在此,覺得不錯,還請Star

看到本期內容這麼少,是不是心動了呢?

前言

上一篇萬字長文 Java AQS隊列同步器以及ReentrantLock的應用 為我們讀 JUC 源碼以及其設計思想做了足夠多的鋪墊,接下來的內容我將重點說明差異化,如果有些童鞋不是能很好的理解文中的一些內容,強烈建議回看上一篇文章,搞懂基礎內容,接下來的閱讀真會輕鬆加愉快

AQS 中我們介紹了獨佔式獲取同步狀態的多種情形:

  • 獨佔式獲取鎖
  • 可響應中斷的獨佔式獲取鎖
  • 有超時限制的獨佔式獲取鎖

AQS 提供的模版方法裏面還差共享式獲取同步狀態沒有介紹,所以我們今天來揭開這個看似神秘的面紗

AQS 中的共享式獲取同步狀態

獨佔式是你中沒我,我中沒你的的一種互斥形式,共享式顯然就不是這樣了,所以他們的唯一區別就是:

同一時刻能否有多個線程同時獲取到同步狀態

簡單來說,就是這樣滴:

我們知道同步狀態 state 是維護在 AQS 中的,拋開可重入鎖的概念,我在上篇文章中也提到了,獨佔式和共享式控制同步狀態 state 的區別僅僅是這樣:

所以說想了解 AQS 的 xxxShared 的模版方法,只需要知道它是怎麼控制 state 的就好了

AQS共享式獲取同步狀態源碼分析

為了幫助大家更好的回憶內容,我將上一篇文章的兩個關鍵內容粘貼在此處,幫助大家快速回憶,關於共享式,大家只需要關注【騷紫色】就可以了

自定義同步器需要重寫的方法

AQS 提供的模版方法

故事就從這裏說起吧 (你會發現和獨佔式驚人的相似),關鍵代碼都加了註釋

    public final void acquireShared(int arg) {
      	// 同樣調用自定義同步器需要重寫的方法,非阻塞式的嘗試獲取同步狀態,如果結果小於零,則獲取同步狀態失敗
        if (tryAcquireShared(arg) < 0)
          	// 調用 AQS 提供的模版方法,進入等待隊列
            doAcquireShared(arg);
    }

進入 doAcquireShared 方法:

    private void doAcquireShared(int arg) {
      	// 創建共享節點「SHARED」,加到等待隊列中
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
          	// 進入“自旋”,這裏並不是純粹意義上的死循環,在獨佔式已經說明過
            for (;;) {
              	// 同樣嘗試獲取當前節點的前驅節點
                final Node p = node.predecessor();
              	// 如果前驅節點為頭節點,嘗試再次獲取同步狀態
                if (p == head) {
                  	// 在此以非阻塞式獲取同步狀態
                    int r = tryAcquireShared(arg);
                  	// 如果返回結果大於等於零,才能跳出外層循環返回
                    if (r >= 0) {
                      	// 這裡是和獨佔式的區別
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

上面代碼第 18 行我們提到和獨佔式獲取同步狀態的區別,貼心的給大家一個更直觀的對比:

差別只在這裏,所以我們就來看看 setHeadAndPropagate(node, r) 到底幹了什麼,我之前說過 JDK 源碼中的方法命名絕大多數還是非常直觀的,該方法直譯過來就是 【設置頭並且傳播/繁衍】。獨佔式只是設置了頭,共享式除了設置頭還多了一個傳播,你的疑問應該已經來了:

啥是傳播,為什麼會有傳播這個設置呢?

想了解這個問題,你需要先知道非阻塞共享式獲取同步狀態返回值的含義:

這裏說的傳播其實說的是 propagate > 0 的情況,道理也很簡單,當前線程獲取同步狀態成功了,還有剩餘的同步狀態可用於其他線程獲取,那就要通知在等待隊列的線程,讓他們嘗試獲取剩餘的同步狀態

如果要讓等待隊列中的線程獲取到通知,需要線程調用 release 方法實現的。接下來,我們走近 setHeadAndPropagate 一探究竟,驗證一下

  // 入參,node: 當前節點
	// 入參,propagate:獲取同步狀態的結果值,即上面方法中的變量 r
	private void setHeadAndPropagate(Node node, int propagate) {
    		// 記錄舊的頭部節點,用於下面的check
        Node h = head; 
    		// 將當前節點設置為頭節點
        setHead(node);
        
    		// 通過 propagate 的值和 waitStatus 的值來判斷是否可以調用 doReleaseShared 方法
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
          	// 如果後繼節點為空或者後繼節點為共享類型,則進行喚醒後繼節點
    				// 這裏後繼節點為空意思是只剩下當前頭節點了,另外這裏的 s == null 也是判斷空指針的標準寫法
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

上面方法的大方向作用我們了解了,但是代碼中何時調用 doReleaseShared 的判斷邏輯還是挺讓人費解的,為什麼會有這麼一大堆的判斷,我們來逐個分析一下:

這裏的空判斷有點讓人頭大,我們先挑出來說明一下:

排除了其他判斷條件的干擾,接下來我們就專註分析 propagate 和 waitStatus 兩個判斷條件就可以了,這裏再將 waitStatus 的幾種狀態展示在這裏,幫助大家理解,【騷粉色】是我們一會要用到的:

propagate > 0

上面已經說過了,如果成立,直接短路後續判斷,然後根據 doReleaseShared 的判斷條件進行釋放

propagate > 0 不成立, h.waitStatus < 0 成立 (注意這裏的h是舊的頭節點)

什麼時候 h.waitStatus < 0 呢?拋開 CONDITION 的使用,只剩下 SIGNAL 和 PROPAGATE,想知道這個答案,需要提前看一下 doReleaseShared() 方法了:

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                  	// CAS 將頭節點的狀態設置為0                
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 設置成功后才能跳出循環喚醒頭節點的下一個節點
                  	unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         // 將頭節點狀態CAS設置成 PROPAGATE 狀態
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

doReleaseShared() 方法中可以看出:

  • 如果讓 h.waitStatus < 0 成立,只能將其設置成 PROPAGATE = -3 的情況,設置成功的前提是 h 頭節點 expected 的狀態是 0;

  • 如果 h.waitStatus = 0,是上述代碼第 8 行 CAS 設置成功,然後喚醒等待中的線程

所以猜測,當前線程執行到 h.waitStatus < 0 的判斷前,有另外一個線程剛好執行了 doReleaseShared() 方法,將 waitStatus 又設置成PROPAGATE = -3

這個理解有點繞,我們還是來畫個圖理解一下吧:

可能有同學還是不太能理解這麼寫的道理,我們一直說 propagate <> = 0 的情況,propagate = 0 代表的是當時/當時/當時 嘗試獲取同步狀態沒成功,但是之後可能又有共享狀態被釋放了,所以上面的邏輯是以防這種萬一,你懂的,嚴謹的併發就是要防止一切萬一,現在結合這個情景再來理解上面的判斷你是否豁然開朗了呢?

繼續向下看,

前序條件不成立,(h = head) == null || h.waitStatus < 0 注意這裏的h是新的頭節點)

有了上面鋪墊,這個就直接畫個圖就更好理解啦,其實就是沒有那麼巧有另外一個線程摻合了

相信到這裏你應該理解共享式獲取同步狀態的全部過程了吧,至於非阻塞共享式獲取同步狀態帶有超時時間獲取同步狀態,結合本文講的 setHeadAndPropagate 邏輯和獨佔式獲取同步狀態的實現過程過程來看,真是一毛一樣,這裏就不再累述了,趕緊打開你的 IDE 去驗證一下吧

我們分析了AQS 的模版方法,還一直沒說 tryAcquireShared(arg) 這個方法是如何被重寫的,想要了解這個,我們就來看一看共享式獲取同步狀態的經典應用 Semaphore

Semaphore 的應用及源碼分析

Semaphore 概念

Semaphore 中文多翻譯為 【信號量】,我還特意查了一下劍橋辭典的英文解釋:

其實就是信號標誌(two flags),比如紅綠燈,每個交通燈產生兩種不同行為

  • Flag1-紅燈:停車
  • Flag2-綠燈:行車

在 Semaphore 裏面,什麼時候是紅燈,什麼時候是綠燈,其實就是靠 tryAcquireShared(arg) 的結果來表示的

  • 獲取不到共享狀態,即為紅燈
  • 獲取到共享狀態,即為綠燈

所以我們走近 Semaphore ,來看看它到底是怎麼應用 AQS 的,又是怎樣重寫 tryAcquireShared(arg) 方法的

Semaphore 源碼分析

先看一下類結構

看到這裏你是否有點跌眼鏡,和 ReentrantLock 相似的可怕吧,如果你有些陌生,再次強烈建議你回看上一篇文章 Java AQS隊列同步器以及ReentrantLock的應用 ,這裏直接提速對比看公平和非公平兩種重寫的 tryAcquireShared(arg) 方法,沒有意外,公平與否,就是判斷是否有前驅節點

方法內部只是計算 state 的剩餘值,那 state 的初始值是多少怎麼設置呢?當然也就是構造方法了:

		public Semaphore(int permits) {
      	// 默認仍是非公平的同步器,至於為什麼默認是非公平的,在上一篇文章中也特意說明過
        sync = new NonfairSync(permits);
    }
    
    NonfairSync(int permits) {
    		super(permits);
    }

super 方法,就會將初始值給到 AQS 中的 state

也許你發現了,當我們把 permits 設置為1 的時候,不就是 ReentrantLock 的互斥鎖了嘛,說的一點也沒錯,我們用 Semaphore 也能實現基本互斥鎖的效果


static int count;
//初始化信號量
static final Semaphore s 
    = new Semaphore(1);
//用信號量保證互斥    
static void addOne() {
  s.acquire();
  try {
    count+=1;
  } finally {
    s.release();
  }
}

But(英文聽力中的重點),Semaphore 肯定不是為這種特例存在的,它是共享式獲取同步狀態的一種實現。如果使用信號量,我們通常會將 permits 設置成大於1的值,不知道你是否還記得我曾在 為什麼要使用線程池? 一文中說到的池化概念,在同一時刻,允許多個線程使用連接池,每個連接被釋放之前,不允許其他線程使用。所以說 Semaphore 可以允許多個線程訪問一個臨界區,最終很好的做到一個限流/限流/限流 的作用

雖然 Semaphore 能很好的提供限流作用,說實話,Semaphore 的限流作用比較單一,我在實際工作中使用 Semaphore 並不是很多,如果真的要用高性能限流器,Guava RateLimiter 是一個非常不錯的選擇,我們後面會做分析,有興趣的可以提前了解一下

關於 Semaphore 源碼,就這麼三下五除二的結束了

總結

不知你有沒有感覺到,我們的節奏明顯加快了,好多原來分散的點在被瘋狂的串聯起來,如果按照這個方式來閱讀 JUC 源碼,相信你也不會一頭扎進去迷失方向,然後沮喪的退出 JUC 吧,然後面試背誦答案,然後忘記,然後再背誦?

跟上節奏,關於共享式獲取同步狀態,Semaphore 只不過是非常經典的應用,ReadWriteLock 和 CountDownLatch 日常應用還是非常廣泛的,我們接下來就陸續聊聊它們吧

靈魂追問

  1. Semaphore 的 permits 設置成1 “等同於” 簡單的互斥鎖實現,那它和 ReentrantLock 的區別還是挺大的,都有哪些區別呢?
  2. 你在項目中是如何使用 Semaphore 的呢?

參考

  1. Java 併發實戰
  2. Java 併發編程的藝術
  3. https://blog.csdn.net/anlian523/article/details/106319294

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案