更新時(shí)間:2022-08-25 來(lái)源:黑馬程序員 瀏覽量:
并發(fā)計(jì)數(shù)器各個(gè)方案介紹
方案概述
1. jdk5提供的原子更新長(zhǎng)整型類(lèi) AtomicLong
2. synchronized
3. jdk8提供的 LongAdder 【單機(jī)推薦】
4. Redisson分布式累加器【分布式推薦】
方案介紹
jdk5提供的原子更新長(zhǎng)整型類(lèi) AtomicLong
在JDK1.5開(kāi)始就新增了并發(fā)的Integer/Long的操作工具類(lèi)AtomicInteger和AtomicLong。
AtomicLong 利用底層操作系統(tǒng)的CAS來(lái)保證原子性,在一個(gè)死循環(huán)內(nèi)不斷執(zhí)行CAS操作,直到操作成功。不過(guò),CAS操作的一個(gè)問(wèn)題是在并發(fā)量比較大的時(shí)候,可能很多次的執(zhí)行CAS操作都不成功,這樣性能就受到較大影響。
示例代碼
AtomicLong value = new AtomicLong(0); //定義 incrementAndGet(); //遞增1 ```
synchronized
synchronized是一個(gè)重量級(jí)鎖,主要是因?yàn)榫€(xiàn)程競(jìng)爭(zhēng)鎖會(huì)引起操作系統(tǒng)用戶(hù)態(tài)和內(nèi)核態(tài)切換,浪費(fèi)資源效率不高,在jdk1.5之前,synchronized沒(méi)有做任何優(yōu)化,但在jdk1.6做了性能優(yōu)化,它會(huì)經(jīng)歷偏向鎖,輕量級(jí)鎖,最后才到重量級(jí)鎖這個(gè)過(guò)程,在性能方面有了很大的提升,在jdk1.7的ConcurrentHashMap是基于ReentrantLock的實(shí)現(xiàn)了鎖,但在jdk1.8之后又替換成了synchronized,就從這一點(diǎn)可以看出JVM團(tuán)隊(duì)對(duì)synchronized的性能還是挺有信心的。下面我們分別來(lái)介紹下無(wú)鎖,偏向鎖,輕量級(jí)鎖,重量級(jí)鎖。
jdk8提供的 LongAdder 【單機(jī)推薦】
在JDK8中又新增了LongAdder,這是一個(gè)針對(duì)Long類(lèi)型的數(shù)據(jù)的操作工具類(lèi)。
那我們知道,在ConcurrentHashMap中,對(duì)Map分割成多個(gè)segment,這樣多個(gè)Segment的操作就可以并行執(zhí)行,從而可以提高性能。在JDK8中,LongAdder與ConcurrentHashMap類(lèi)似,將內(nèi)部操作數(shù)據(jù)value分離成一個(gè)Cell數(shù)組,每個(gè)線(xiàn)程訪(fǎng)問(wèn)時(shí),通過(guò)Hash等算法映射到其中一個(gè)Cell上。
計(jì)算最終的數(shù)據(jù)結(jié)果,則是各個(gè)Cell數(shù)組的累計(jì)求和。
LongAddr常用api方法
add(): //增加指定的數(shù)值; increament(): //增加1; decrement(): //減少1; intValue(); //intValue();/floatValue()/doubleValue():得到最終計(jì)數(shù)后的結(jié)果 sum()://求和,得到最終計(jì)數(shù)結(jié)果 sumThenReset()://求和得到最終計(jì)數(shù)結(jié)果,并重置value。 ```
Redisson分布式累加器【分布式推薦】
基于Redis的Redisson分布式整長(zhǎng)型累加器(LongAdder)采用了與java.util.concurrent.atomic.LongAdder類(lèi)似的接口。通過(guò)利用客戶(hù)端內(nèi)置的LongAdder對(duì)象,為分布式環(huán)境下遞增和遞減操作提供了很高得性能。據(jù)統(tǒng)計(jì)其性能最高比分布式AtomicLong對(duì)象快 10000 倍以上。
RLongAddr itheimaLongAddr = redission.getLongAddr("itheimaLongAddr"); itheimaLongAddr.add(100); //添加指定數(shù)量 itheimaLongAddr.increment(); //遞增1 itheimaLongAddr.increment(); //遞減1 itheimaLongAddr.sum(); //聚合求和 ```
基于Redis的Redisson分布式雙精度浮點(diǎn)累加器(DoubleAdder)采用了與java.util.concurrent.atomic.DoubleAdder類(lèi)似的接口。通過(guò)利用客戶(hù)端內(nèi)置的DoubleAdder對(duì)象,為分布式環(huán)境下遞增和遞減操作提供了很高得性能。據(jù)統(tǒng)計(jì)其性能最高比分布式AtomicDouble對(duì)象快 12000 倍。
示例代碼
RLongDouble itheimaDouble = redission.getLongDouble("itheimaLongDouble"); itheimaDouble.add(100); //添加指定數(shù)量 itheimaDouble.increment(); //遞增1 itheimaDouble.increment(); //遞減1 itheimaDouble.sum(); //聚合求和 ```
以上【整長(zhǎng)型累加器】和【雙精度浮點(diǎn)累加器】完美適用于分布式統(tǒng)計(jì)計(jì)量場(chǎng)景。
各個(gè)方案性能測(cè)試
測(cè)試代碼
``` package com.itheima._01性能比較; import org.junit.Test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; /** * @author 黑馬程序員 */ public class CountTest { private int count = 0; @Test public void startCompare() { compareDetail(1, 100 * 10000); compareDetail(20, 100 * 10000); compareDetail(30, 100 * 10000); compareDetail(40, 100 * 10000); compareDetail(100, 100 * 10000); } /** * @param threadCount 線(xiàn)程數(shù) * @param times 每個(gè)線(xiàn)程增加的次數(shù) */ public void compareDetail(int threadCount, int times) { try { System.out.println(String.format("threadCount: %s, times: %s", threadCount, times)); long start = System.currentTimeMillis(); testSynchronized(threadCount, times); System.out.println("testSynchronized cost: " + (System.currentTimeMillis() - start)); start = System.currentTimeMillis(); testAtomicLong(threadCount, times); System.out.println("testAtomicLong cost: " + (System.currentTimeMillis() - start)); start = System.currentTimeMillis(); testLongAdder(threadCount, times); System.out.println("testLongAdder cost: " + (System.currentTimeMillis() - start)); System.out.println(); } catch (Exception e) { e.printStackTrace(); } } public void testSynchronized(int threadCount, int times) throws InterruptedException { List<Thread> threadList = new ArrayList<>(); for (int i = 0; i < threadCount; i++) { threadList.add(new Thread(()-> { for (int j = 0; j < times; j++) { add(); } })); } for (Thread thread : threadList) { thread.start(); } for (Thread thread : threadList) { thread.join(); } } public synchronized void add() { count++; } public void testAtomicLong(int threadCount, int times) throws InterruptedException { AtomicLong count = new AtomicLong(); List<Thread> threadList = new ArrayList<>(); for (int i = 0; i < threadCount; i++) { threadList.add(new Thread(()-> { for (int j = 0; j < times; j++) { count.incrementAndGet(); } })); } for (Thread thread : threadList) { thread.start(); } for (Thread thread : threadList) { thread.join(); } } public void testLongAdder(int threadCount, int times) throws InterruptedException { LongAdder count = new LongAdder(); List<Thread> threadList = new ArrayList<>(); for (int i = 0; i < threadCount; i++) { threadList.add(new Thread(()-> { for (int j = 0; j < times; j++) { count.increment(); } })); } for (Thread thread : threadList) { thread.start(); } for (Thread thread : threadList) { thread.join(); } } } ```
運(yùn)行結(jié)果
threadCount: 1, times: 1000000 testSynchronized cost: 69 testAtomicLong cost: 16 testLongAdder cost: 15 threadCount: 20, times: 1000000 testSynchronized cost: 639 testAtomicLong cost: 457 testLongAdder cost: 59 threadCount: 30, times: 1000000 testSynchronized cost: 273 testAtomicLong cost: 538 testLongAdder cost: 70 threadCount: 40, times: 1000000 testSynchronized cost: 312 testAtomicLong cost: 717 testLongAdder cost: 81 threadCount: 100, times: 1000000 testSynchronized cost: 719 testAtomicLong cost: 2098 testLongAdder cost: 225 ```
結(jié)論
并發(fā)量比較低的時(shí)候AtomicLong優(yōu)勢(shì)比較明顯,因?yàn)锳tomicLong底層是一個(gè)樂(lè)觀(guān)鎖,不用阻塞線(xiàn)程,不斷cas即可。但是在并發(fā)比較高的時(shí)候用synchronized比較有優(yōu)勢(shì),因?yàn)榇罅烤€(xiàn)程不斷cas,會(huì)導(dǎo)致cpu持續(xù)飆高,反而會(huì)降低效率
LongAdder無(wú)論并發(fā)量高低,優(yōu)勢(shì)都比較明顯。且并發(fā)量越高,優(yōu)勢(shì)越明顯
原理分析
AtomicLong 實(shí)現(xiàn)原子操作原理
非原子操作示例代碼
package com.itheima._02Unsafe測(cè)試; import java.util.ArrayList; import java.util.List; /** * @author 黑馬程序員 */ public class Test1 { private int value = 0; public static void main(String[] args) throws InterruptedException { Test1 test1 = new Test1(); test1.increment(); System.out.println("期待值:" + 100 * 100 + ",最終結(jié)果值:" + test1.value); //結(jié)果,期待值:10000,最終結(jié)果值:xxxx } private void increment() throws InterruptedException { List<Thread> list = new ArrayList<>(); //啟動(dòng)100個(gè)線(xiàn)程,每個(gè)線(xiàn)程對(duì)value進(jìn)行累加100次 for (int i = 0; i < 100; i++) { Thread t = new Thread(() -> { for (int j = 0; j < 100; j++) { value++; } }); list.add(t); t.start(); } //保證所有線(xiàn)程運(yùn)行完成 for (Thread thread : list) { thread.join(); } } } ```
運(yùn)行效果
結(jié)論
> 可以發(fā)現(xiàn)輸出的結(jié)果值錯(cuò)誤,這是因?yàn)?`value++` 不是一個(gè)原子操作,它將 `value++` 拆分成了 3 個(gè)步驟 `load、add、store`,多線(xiàn)程并發(fā)有可能上一個(gè)線(xiàn)程 add 過(guò)后還沒(méi)有 store 下一個(gè)線(xiàn)程又執(zhí)行了 load 了這種重復(fù)造成得到的結(jié)果可能比最終值要小。
AtomicLong是JDK1.5提供的原子操作示例代碼
package com.itheima._03AtomicLong的CAS原子操作示例; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** * @author 黑馬程序員 */ public class Test2 { private AtomicLong value = new AtomicLong(0); public static void main(String[] args) throws InterruptedException { Test2 test1 = new Test2(); test1.increment(); System.out.println("期待值:" + 100 * 100 + ",最終結(jié)果值:" + test1.value); //結(jié)果,期待值:10000,最終結(jié)果值:10000 } private void increment() throws InterruptedException { List<Thread> list = new ArrayList<>(); //啟動(dòng)100個(gè)線(xiàn)程,每個(gè)線(xiàn)程對(duì)value進(jìn)行累加100次 for (int i = 0; i < 100; i++) { Thread t = new Thread(() -> { for (int j = 0; j < 100; j++) { value.incrementAndGet(); } }); list.add(t); t.start(); } //保證所有線(xiàn)程運(yùn)行完成 for (Thread thread : list) { thread.join(); } } } ```
運(yùn)行效果
AtomicLong CAS原理介紹
1.使用volatile保證內(nèi)存可見(jiàn)性,獲取主存中最新的操作數(shù)據(jù)
2.使用CAS(Compare-And-Swap)操作保證數(shù)據(jù)原子性
CAS算法是jdk對(duì)并發(fā)操作共享數(shù)據(jù)的支持,包含了3個(gè)操作數(shù)
第一個(gè)操作數(shù):內(nèi)存值value(V)
第二個(gè)操作數(shù):預(yù)估值expect(O)
第三個(gè)操作數(shù):更新值new(N)
含義:CAS比較交換的過(guò)程可以通俗的理解為CAS(V,O,N),包含三個(gè)值分別為:V 內(nèi)存地址(主存)存放的實(shí)際值;O 預(yù)期的值(舊值);N 更新的新值。當(dāng)V和O相同時(shí),也就是說(shuō)舊值和內(nèi)存中實(shí)際的值相同表明該值沒(méi)有被其他線(xiàn)程更改過(guò),即該舊值O就是目前來(lái)說(shuō)最新的值了,自然而然可以將新值N賦值給V;當(dāng)V和O不相同時(shí),會(huì)一致循環(huán)下去直至修改成功。
AtomicLong底層CAS實(shí)現(xiàn)原子操作原理
查看incrementAndGet()方法源碼
public final long incrementAndGet() { return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L; } ``` getAndAddLong方法源碼 ```java public final long getAndAddLong(Object var1, long var2, long var4) { long var6; do { var6 = this.getLongVolatile(var1, var2); } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4)); return var6; } ```
> 這里是一個(gè)循環(huán)CAS操作
compareAndSwapLong方法源碼
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6); ```
我們發(fā)現(xiàn)調(diào)用的是 native 的 `unsafe.compareAndSwapLong(Object obj, long valueOffset, Long expect, Long update)`,我們翻看 Hotspot 源碼發(fā)現(xiàn)在 unsafe.cpp 中定義了這樣一段代碼
> Unsafe中基本都是調(diào)用native方法,那么就需要去JVM里面找對(duì)應(yīng)的實(shí)現(xiàn)。
>
> 到`http://hg.openjdk.java.net/` 進(jìn)行一步步選擇下載對(duì)應(yīng)的hotspot版本,我這里下載的是`http://hg.openjdk.java.net/jdk8u/jdk8u60/hotspot/archive/tip.tar.gz`,
>
> 然后解hotspot目錄,發(fā)現(xiàn) `\src\share\vm\prims\unsafe.cpp`,這個(gè)就是對(duì)應(yīng)jvm相關(guān)的c++實(shí)現(xiàn)類(lèi)了。
>
> 比如我們對(duì)CAS部分的實(shí)現(xiàn)很感興趣,就可以在該文件中搜索compareAndSwapInt,此時(shí)可以看到對(duì)應(yīng)的JNI方法為`Unsafe_CompareAndSwapInt`
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapLong(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jlong e, jlong x)) UnsafeWrapper("Unsafe_CompareAndSwapLong"); Handle p (THREAD, JNIHandles::resolve(obj)); jlong* addr = (jlong*)(index_oop_from_field_offset_long(p(), offset)); #ifdef SUPPORTS_NATIVE_CX8 return (jlong)(Atomic::cmpxchg(x, addr, e)) == e; #else if (VM_Version::supports_cx8()) return (jlong)(Atomic::cmpxchg(x, addr, e)) == e; else { jboolean success = false; MutexLockerEx mu(UnsafeJlong_lock, Mutex::_no_safepoint_check_flag); jlong val = Atomic::load(addr); if (val == e) { Atomic::store(x, addr); success = true; } return success; } #endif UNSAFE_END ```
Atomic::cmpxchg c++源碼
可以看到調(diào)用了“Atomic::cmpxchg”方法,“Atomic::cmpxchg”方法在linux_x86和windows_x86的實(shí)現(xiàn)如下。
linux_x86的實(shí)現(xiàn):
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { int mp = os::is_MP(); __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)" : "=a" (exchange_value) : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp) : "cc", "memory"); return exchange_value; } ```
windows_x86的實(shí)現(xiàn)(c++源文件):
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { // alternative for InterlockedCompareExchange int mp = os::is_MP(); __asm { mov edx, dest mov ecx, exchange_value mov eax, compare_value LOCK_IF_MP(mp) cmpxchg dword ptr [edx], ecx } } ```
Atomic::cmpxchg方法解析:
mp是“os::is_MP()”的返回結(jié)果,“os::is_MP()”是一個(gè)內(nèi)聯(lián)函數(shù),用來(lái)判斷當(dāng)前系統(tǒng)是否為多處理器。
如果當(dāng)前系統(tǒng)是多處理器,該函數(shù)返回1。
否則,返回0。
LOCK_IF_MP(mp)會(huì)根據(jù)mp的值來(lái)決定是否為cmpxchg指令添加lock前綴。
如果通過(guò)mp判斷當(dāng)前系統(tǒng)是多處理器(即mp值為1),則為cmpxchg指令添加lock前綴。
否則,不加lock前綴。
這是一種優(yōu)化手段,認(rèn)為單處理器的環(huán)境沒(méi)有必要添加lock前綴,只有在多核情況下才會(huì)添加lock前綴,因?yàn)閘ock會(huì)導(dǎo)致性能下降。cmpxchg是匯編指令,作用是比較并交換操作數(shù)。
> 底層會(huì)調(diào)用cmpxchg匯編指令,如果是多核處理器會(huì)加鎖實(shí)現(xiàn)原子操作
反匯編指令查詢(xún)
查看java程序運(yùn)行的匯編指令資料
將上圖2個(gè)文件拷貝到j(luò)re\bin目錄下,如下圖
配置運(yùn)行參數(shù)
```
-server -Xcomp -XX:+UnlockDiagnosticVMOptions -XX:+PrintAssembly -XX:CompileCommand=compileonly,*
```
運(yùn)行Test2效果
synchronized 實(shí)現(xiàn)同步操作原理
鎖對(duì)象
java中任何一個(gè)對(duì)象都可以稱(chēng)為鎖對(duì)象,原因在于java對(duì)象在內(nèi)存中存儲(chǔ)結(jié)構(gòu),如下圖所示:
在對(duì)象頭中主要存儲(chǔ)的主要是一些運(yùn)行時(shí)的數(shù)據(jù),如下所示:
其中 在Mark Work中存儲(chǔ)著該對(duì)象作為鎖時(shí)的一些信息,如下所示是Mark Work中在64位系統(tǒng)中詳細(xì)信息:
偏向鎖
在無(wú)競(jìng)爭(zhēng)環(huán)境中(沒(méi)有并發(fā))使用一種鎖
> 偏向鎖的作用是當(dāng)有線(xiàn)程訪(fǎng)問(wèn)同步代碼或方法時(shí),線(xiàn)程只需要判斷對(duì)象頭的Mark Word中判斷一下是否有偏向鎖指向線(xiàn)程ID.
>
> 偏向鎖記錄過(guò)程
>
> - 線(xiàn)程搶到了對(duì)象的同步鎖(鎖標(biāo)志為01參考上圖即無(wú)其他線(xiàn)程占用)
> - 對(duì)象Mark World 將是否偏向標(biāo)志位設(shè)置為1
> - 記錄搶到鎖的線(xiàn)程ID
> - 進(jìn)入偏向狀態(tài)
輕量級(jí)鎖
當(dāng)有另外一個(gè)線(xiàn)程競(jìng)爭(zhēng)獲取這個(gè)鎖時(shí),由于該鎖已經(jīng)是偏向鎖,當(dāng)發(fā)現(xiàn)對(duì)象頭 Mark Word 中的線(xiàn)程 ID 不是自己的線(xiàn)程 ID,就會(huì)進(jìn)行 CAS 操作獲取鎖,**如果獲取成功**,直接替換 Mark Word 中的線(xiàn)程 ID 為自己的 ID,該鎖會(huì)保持偏向鎖狀態(tài);**如果獲取鎖失敗**,代表當(dāng)前鎖有一定的競(jìng)爭(zhēng),偏向鎖將升級(jí)為輕量級(jí)鎖。
- 舉個(gè)例子來(lái)說(shuō)明一下什么時(shí)候需要升級(jí)偏向鎖
假設(shè)A線(xiàn)程 持有鎖 X(此時(shí)X是偏向鎖) 這是有個(gè)B線(xiàn)程也同樣用到了鎖X,而B(niǎo)線(xiàn)程在檢查鎖對(duì)象的Mark World時(shí)發(fā)現(xiàn)偏向鎖的線(xiàn)程ID已經(jīng)指向了線(xiàn)程A。這時(shí)候就需要升級(jí)鎖X為輕量級(jí)鎖。輕量級(jí)鎖意味著標(biāo)示該資源現(xiàn)在處于競(jìng)爭(zhēng)狀態(tài)。
當(dāng)有其他線(xiàn)程想訪(fǎng)問(wèn)加了輕量級(jí)鎖的資源時(shí),會(huì)使用自旋鎖優(yōu)化,來(lái)進(jìn)行資源訪(fǎng)問(wèn)。
> 自旋策略
>
> JVM 提供了一種自旋鎖,可以通過(guò)自旋方式不斷嘗試獲取鎖,從而避免線(xiàn)程被掛起阻塞。這是基于大多數(shù)情況下,線(xiàn)程持有鎖的時(shí)間都不會(huì)太長(zhǎng),畢竟線(xiàn)程被掛起阻塞可能會(huì)得不償失。
>
> 從 JDK1.7 開(kāi)始,自旋鎖默認(rèn)啟用,自旋次數(shù)由 JVM 設(shè)置決定,這里我不建議設(shè)置的重試次數(shù)過(guò)多,因?yàn)?CAS 重試操作意味著長(zhǎng)時(shí)間地占用 CPU。自旋鎖重試之后如果搶鎖依然失敗,同步鎖就會(huì)升級(jí)至重量級(jí)鎖,鎖標(biāo)志位改為 10。在這個(gè)狀態(tài)下,未搶到鎖的線(xiàn)程都會(huì)進(jìn)入 Monitor,之后會(huì)被阻塞在 _WaitSet 隊(duì)列中。
重量級(jí)鎖
自旋失敗,很大概率 再一次自選也是失敗,因此直接升級(jí)成重量級(jí)鎖,進(jìn)行線(xiàn)程阻塞,減少cpu消耗。
當(dāng)鎖升級(jí)為重量級(jí)鎖后,未搶到鎖的線(xiàn)程都會(huì)被阻塞,進(jìn)入阻塞隊(duì)列。
重量級(jí)鎖在高并發(fā)下性能就會(huì)變慢,因?yàn)樗袥](méi)有獲取鎖的線(xiàn)程會(huì)進(jìn)行阻塞等待,到獲取鎖的時(shí)候被喚醒,這些操作都是消耗很多資源。
輕量級(jí)鎖膨脹流程圖
LongAdder 實(shí)現(xiàn)原子操作原理
LongAdder實(shí)現(xiàn)高并發(fā)計(jì)數(shù)實(shí)現(xiàn)思路
LongAdder實(shí)現(xiàn)高并發(fā)的秘密就是用空間換時(shí)間,對(duì)一個(gè)值的cas操作,變成對(duì)多個(gè)值的cas操作,當(dāng)獲取數(shù)量的時(shí)候,對(duì)這多個(gè)值加和即可。
測(cè)試代碼
``` package com.itheima._04LongAddr使用測(cè)試; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAdder; import java.util.function.LongBinaryOperator; /** * @author 黑馬程序員 */ public class Test3 { private LongAdder value = new LongAdder(); //默認(rèn)初始值0 public static void main(String[] args) throws InterruptedException { Test3 test1 = new Test3(); test1.increment(); System.out.println("期待值:" + 100 * 100 + ",最終結(jié)果值:" + test1.value.sum()); //結(jié)果,期待值:10000,最終結(jié)果值:10000 } private void increment() throws InterruptedException { List<Thread> list = new ArrayList<>(); //啟動(dòng)100個(gè)線(xiàn)程,每個(gè)線(xiàn)程對(duì)value進(jìn)行累加100次 for (int i = 0; i < 100; i++) { Thread t = new Thread(() -> { for (int j = 0; j < 100; j++) { value.increment(); } }); list.add(t); t.start(); } //保證所有線(xiàn)程運(yùn)行完成 for (Thread thread : list) { thread.join(); } } } ```
源碼分析
1. 先對(duì)base變量進(jìn)行cas操作,cas成功后返回
2. 對(duì)線(xiàn)程獲取一個(gè)hash值(調(diào)用getProbe),hash值對(duì)數(shù)組長(zhǎng)度取模,定位到cell數(shù)組中的元素,對(duì)數(shù)組中的元素進(jìn)行cas
增加數(shù)量源碼
public void increment() { add(1L); } ``` ```java public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } } ```
當(dāng)數(shù)組不為空,并且根據(jù)線(xiàn)程hash值定位到數(shù)組某個(gè)下標(biāo)中的元素不為空,對(duì)這個(gè)元素cas成功則直接返回,否則進(jìn)入longAccumulate方法
1. cell數(shù)組已經(jīng)初始化完成,主要是在cell數(shù)組中放元素,對(duì)cell數(shù)組進(jìn)行擴(kuò)容等操作
2. cell數(shù)組沒(méi)有初始化,則對(duì)數(shù)組進(jìn)行初始化
3. cell數(shù)組正在初始化,這時(shí)其他線(xiàn)程利用cas對(duì)baseCount進(jìn)行累加操作
完整代碼
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; else if (n >= NCPU || cells != as) collide = false; // At max size or stale else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } } ```
獲取計(jì)算數(shù)量源碼
public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; } ```
需要注意的是,調(diào)用sum()返回的數(shù)量有可能并不是當(dāng)前的數(shù)量,因?yàn)樵谡{(diào)用sum()方法的過(guò)程中,可能有其他數(shù)組對(duì)base變量或者cell數(shù)組進(jìn)行了改動(dòng),所以需要確保所有線(xiàn)程運(yùn)行完再獲取就是準(zhǔn)確值
LongAdder 的前世今生
其實(shí)在 Jdk1.7 時(shí)代,LongAdder 還未誕生時(shí),就有一些人想著自己去實(shí)現(xiàn)一個(gè)高性能的計(jì)數(shù)器了,比如一款 Java 性能監(jiān)控框架 dropwizard/metrics 就做了這樣事,在早期版本中,其優(yōu)化手段并沒(méi)有 Jdk1.8 的 LongAdder 豐富,而在 metrics 的最新版本中,其已經(jīng)使用 Jdk1.8 的 LongAdder 替換掉了自己的輪子。在最后的測(cè)評(píng)中,我們將 metrics 版本的 LongAdder 也作為一個(gè)參考對(duì)象。
應(yīng)用場(chǎng)景
AtomicLong等原子類(lèi)的使用
并發(fā)少競(jìng)爭(zhēng)少(讀多寫(xiě)少)的計(jì)數(shù)原子操作
LongAdder 的使用
高性能計(jì)數(shù)器的首選方案, 單體項(xiàng)目建議使用LongAddr,分布式環(huán)境建議使用Redisson分布式累加器
應(yīng)用場(chǎng)景功能:獲取全局自增id值
Synchronized與Lock的使用比較
Synchronized 適合少量的同步并發(fā)競(jìng)爭(zhēng)
Lock 適合大量的同步并發(fā)競(jìng)爭(zhēng)
總結(jié)
并發(fā)情況優(yōu)化鎖思路:
互斥鎖 -> 樂(lè)觀(guān)鎖 -> 鎖的粒度控制
在Java中對(duì)應(yīng)的實(shí)現(xiàn)方式:
ReentrantLock或者Syschronized -> CAS + Volatile -> 拆分競(jìng)爭(zhēng)點(diǎn)(longAddr,分布式累加器,ConcurrentHashMap等)
ReentrantLock或者Syschronized 在高并發(fā)時(shí)都存在獲取鎖等待、阻塞、喚醒等操作,所以在使用的使用注意拆分競(jìng)爭(zhēng)點(diǎn)。
AtomicLong
1. 并發(fā)量非常高,可能導(dǎo)致都在不停的爭(zhēng)搶該值,可能導(dǎo)致很多線(xiàn)程一致處于循環(huán)狀態(tài)而無(wú)法更新數(shù)據(jù),從而導(dǎo)致 CPU 資源的消耗過(guò)高。解決這個(gè)問(wèn)題需要使用LongAdder
2. ABA 問(wèn)題,比如說(shuō)上一個(gè)線(xiàn)程增加了某個(gè)值,又改變了某個(gè)值,然后后面的線(xiàn)程以為數(shù)據(jù)沒(méi)有發(fā)生過(guò)變化,其實(shí)已經(jīng)被改動(dòng)了。解決這個(gè)問(wèn)題請(qǐng)參考《擴(kuò)展:原子更新字段類(lèi)-ABA問(wèn)題解決》
synchronized
synchronized鎖升級(jí)實(shí)際上是把本來(lái)的悲觀(guān)鎖變成了 在一定條件下 使用無(wú)所(同樣線(xiàn)程獲取相同資源的偏向鎖),以及使用樂(lè)觀(guān)(自旋鎖 cas)和一定條件下悲觀(guān)(重量級(jí)鎖)的形式。
偏向鎖:適用于單線(xiàn)程適用鎖的情況
輕量級(jí)鎖:適用于競(jìng)爭(zhēng)較不激烈的情況(這和樂(lè)觀(guān)鎖的使用范圍類(lèi)似)
重量級(jí)鎖:適用于競(jìng)爭(zhēng)激烈的情況
LongAdder
- AtomicLong :并發(fā)場(chǎng)景下讀性能優(yōu)秀,寫(xiě)性能急劇下降,不適合作為高性能的計(jì)數(shù)器方案。內(nèi)需求量少。
- LongAdder :并發(fā)場(chǎng)景下寫(xiě)性能優(yōu)秀,讀性能由于組合求值的原因,不如直接讀值的方案,但由于計(jì)數(shù)器場(chǎng)景寫(xiě)多讀少的緣故,整體性能在幾個(gè)方案中最優(yōu),是高性能計(jì)數(shù)器的首選方案。由于 Cells 數(shù)組以及緩存行填充的緣故,占用內(nèi)存較大。
最佳方案
高性能計(jì)數(shù)器的首選方案, 單體項(xiàng)目建議使用LongAddr,分布式環(huán)境建議使用Redisson分布式累加器
應(yīng)用場(chǎng)景功能:獲取全局自增id值