更新時(shí)間:2022-08-26 來源:黑馬程序員 瀏覽量:
一、文章導(dǎo)讀:
這部分內(nèi)容讓大家讀懂ConcurrentHashMap源碼的底層實(shí)現(xiàn)從而在工作中合理去使用他并且在面試中能做到游刃有余,主要內(nèi)容如下:
* 核心構(gòu)造方法
* 核心成員變量
* put方法過程數(shù)據(jù)的完整過程
* get方法的無(wú)鎖實(shí)現(xiàn)
二、文章講解內(nèi)容
JDK8中ConcurrentHashMap的結(jié)構(gòu)是:數(shù)組+鏈表+紅黑樹。
因?yàn)樵趆ash沖突嚴(yán)重的情況下,鏈表的查詢效率是O(n),所以jdk8中改成了單個(gè)鏈表的個(gè)數(shù)大于8時(shí),數(shù)組長(zhǎng)度小于64就擴(kuò)容,數(shù)組長(zhǎng)度大于等于64,則鏈表會(huì)轉(zhuǎn)換為紅黑樹,這樣以空間換時(shí)間,查詢效率會(huì)變O(nlogn)。
紅黑樹在Node數(shù)組內(nèi)部存儲(chǔ)的不是一個(gè)TreeNode對(duì)象,而是一個(gè)TreeBin對(duì)象,TreeBin內(nèi)部維持著一個(gè)紅黑樹。
在JDK8中ConcurrentHashMap最經(jīng)點(diǎn)的實(shí)現(xiàn)是使用CAS+synchronized+volatile 來保證并發(fā)安全。
一、JDK7中ConcurrentHashMap的實(shí)現(xiàn)
在JDK1.7中ConcurrentHashMap是通過定義多個(gè)Segment來實(shí)現(xiàn)的分段加鎖,一個(gè)Segment對(duì)應(yīng)一個(gè)數(shù)組,如果多線程對(duì)同一個(gè)數(shù)組中的元素進(jìn)行添加那么多個(gè)線程會(huì)去競(jìng)爭(zhēng)同一把鎖,他鎖的是一個(gè)數(shù)組,有多少個(gè)segment那么就有多少把鎖,這個(gè)力度還是比較粗的。
JDK8的實(shí)現(xiàn)是下文要重點(diǎn)探討的內(nèi)容,同時(shí)下文全部都是JDK8的實(shí)現(xiàn)。
二、核心構(gòu)造方法
/** * Creates a new, empty map with the default initial table size (16). 無(wú)參構(gòu)造函數(shù),new一個(gè)默認(rèn)table容量為16的ConcurrentHashMap */ public ConcurrentHashMap() { }
/** * Creates a new, empty map with an initial table size * accommodating the specified number of elements without the need * to dynamically resize. * * @param initialCapacity The implementation performs internal * sizing to accommodate this many elements. * @throws IllegalArgumentException if the initial capacity of * elements is negative 做了3件事: 1.如果入?yún)?lt;0,拋出異常。 2.如果入?yún)⒋笥谧畲笕萘?,則使用最大容量(是2的30次方) 3.tableSizeFor方法得到一個(gè)大于負(fù)載因子入?yún)⑶易罱咏?的N次方的數(shù)作為容量 4.設(shè)置sizeCtl的值等于初始化容量。未對(duì)table進(jìn)行初始化,table的初始化要在第一次put的時(shí)候進(jìn)行。 */ public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; }
特別注意:未對(duì)table進(jìn)行初始化,table的初始化要在第一次put的時(shí)候進(jìn)行。
三、核心成員變量
// 該字段控制table(也被稱作hash桶數(shù)組)的初始化和擴(kuò)容 private transient volatile int sizeCtl; // table最大容量是2的30次方 private static final int MAXIMUM_CAPACITY = 1 << 30; // table的默認(rèn)初始化容量-擴(kuò)容總是2的n次方 private static final int DEFAULT_CAPACITY = 16; // table的負(fù)載因子。當(dāng)前已使用容量 >= 負(fù)載因子*總?cè)萘康臅r(shí)候,進(jìn)行resize擴(kuò)容 private static final float LOAD_FACTOR = 0.75f; // 存儲(chǔ)數(shù)據(jù)的數(shù)組-注意添加了volatile transient volatile Node<K,V>[] table; // 當(dāng)桶內(nèi)鏈表長(zhǎng)度>=8時(shí),會(huì)將鏈表轉(zhuǎn)成紅黑樹-注意需要和MIN_TREEIFY_CAPACITY結(jié)合起來用 static final int TREEIFY_THRESHOLD = 8; // table的總?cè)萘?,要大?4,桶內(nèi)鏈表才轉(zhuǎn)換為樹形結(jié)構(gòu),否則當(dāng)桶內(nèi)鏈表長(zhǎng)度>=8時(shí)會(huì)擴(kuò)容 static final int MIN_TREEIFY_CAPACITY = 64; // 當(dāng)桶內(nèi)node小于6時(shí),紅黑樹會(huì)轉(zhuǎn)成鏈表。 static final int UNTREEIFY_THRESHOLD = 6;
四、put存儲(chǔ)數(shù)據(jù)
先從JDK的源碼中復(fù)制一段上頭的代碼,如下代碼就完成了數(shù)據(jù)的添加,在添加的時(shí)候還完成了數(shù)組的初始化、擴(kuò)容、CAS修改、分段鎖的實(shí)現(xiàn),大家大體的對(duì)該代碼有一個(gè)認(rèn)識(shí)即可我們后面會(huì)詳細(xì)的畫圖分析該過程。
public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { // 如果key或者value為空,拋出異常 if (key == null || value == null) throw new NullPointerException(); // 得到hash值 int hash = spread(key.hashCode()); // 用來記錄所在table數(shù)組中的桶的中鏈表的個(gè)數(shù),后面會(huì)用于判斷是否鏈表過長(zhǎng)需要轉(zhuǎn)紅黑樹 int binCount = 0; // for循環(huán),直到put成功插入數(shù)據(jù)才會(huì)跳出 for (Node<K,V>[] tab = table;;) { // f=桶頭節(jié)點(diǎn) n=table的長(zhǎng)度 i=在數(shù)組中的哪個(gè)下標(biāo) fh=頭節(jié)點(diǎn)的hash值 Node<K,V> f; int n, i, fh; // 如果table沒有初始化 if (tab == null || (n = tab.length) == 0) // 初始化table tab = initTable(); // 根據(jù)數(shù)組長(zhǎng)度減1再對(duì)hash值取余得到在node數(shù)組中位于哪個(gè)下標(biāo) // 用tabAt獲取數(shù)組中該下標(biāo)的元素 // 如果該元素為空 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 直接將put的值包裝成Node用tabAt方法放入數(shù)組內(nèi)這個(gè)下標(biāo)的位置中 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 如果頭結(jié)點(diǎn)hash值為-1,則為ForwardingNode結(jié)點(diǎn),說明正再擴(kuò)容, else if ((fh = f.hash) == MOVED) // 調(diào)用hlepTransfer幫助擴(kuò)容 tab = helpTransfer(tab, f); // 否則鎖住槽的頭節(jié)點(diǎn) else { V oldVal = null; // 鎖桶的頭節(jié)點(diǎn) synchronized (f) { // 雙重鎖檢測(cè),看在加鎖之前,該桶的頭節(jié)點(diǎn)是不是被改過了 if (tabAt(tab, i) == f) { // 如果桶的頭節(jié)點(diǎn)的hash值大于0 if (fh >= 0) { binCount = 1; // 遍歷鏈表 for (Node<K,V> e = f;; ++binCount) { K ek; // 如果遇到節(jié)點(diǎn)hash值相同,key相同,看是否需要更新value if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //如果到鏈表尾部都沒有遇到相同的 if ((e = e.next) == null) { // 就生成Node掛在鏈表尾部,該Node成為一個(gè)新的鏈尾。 pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 如果桶的頭節(jié)點(diǎn)是個(gè)TreeBin else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; // 用紅黑樹的形式添加節(jié)點(diǎn)或者更新相同hash、key的值。 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 如果鏈表長(zhǎng)度>需要樹化的閾值 if (binCount >= TREEIFY_THRESHOLD) //調(diào)用treeifyBin方法將鏈表轉(zhuǎn)換為紅黑樹,而這個(gè)方法中會(huì)判斷數(shù)組值是否大于64,如果沒有大于64則只擴(kuò)容 treeifyBin(tab, i); if (oldVal != null) // 如果是修改,不是新增,則返回被修改的原值 return oldVal; break; } } } // 計(jì)數(shù)器加1,完成新增后,table擴(kuò)容,就是這里面觸發(fā) addCount(1L, binCount); // 新增后,返回Null return null; }
4.1 高效的hash算法
int hash = spread(key.hashCode()); static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
該算法其實(shí)在HashMap中我們就已經(jīng)重點(diǎn)說過了,他既保存了key值得hash的高16位也保存了低16位,從而讓key值在不同的數(shù)組中盡可能的散列,從而避免hash沖突。
4.2 數(shù)組的初始化
for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable();
注意是在循環(huán)中判斷的table是否為空如果為空則會(huì)調(diào)用initTable完成數(shù)組的默認(rèn)初始化。
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab;
以上代碼做了如下事情:
1.如果table為空,進(jìn)入while準(zhǔn)備開始初始化。
2.將sizeCtl賦值給sc。如果sizeCtl<0,線程等待,小于零時(shí)表示有其他線程在執(zhí)行初始化。
3.如果能將sizeCtl設(shè)置為-1,則開始進(jìn)行初始化操作
4.用戶有指定初始化容量,就用用戶指定的,否則用默認(rèn)的16.
5.生成一個(gè)長(zhǎng)度為16的Node數(shù)組,把引用給table。
6.重新設(shè)置sizeCtl=數(shù)組長(zhǎng)度 - (數(shù)組長(zhǎng)度 >>>2) 這是忽略符號(hào)的移位運(yùn)算符,可以理解成 n - (n / 2)。
4.3 不沖突的數(shù)據(jù)添加過程
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); }
高效的尋址算法: (n - 1) & hash這個(gè)在我們的HashMap中也是講過的了就不在贅述。
通過高效的尋址算法得到一個(gè)索引并獲取該索引的數(shù)據(jù)如果不為空則調(diào)用casTabAt方法進(jìn)行CAS的原子修改,CAS在Java層面是無(wú)鎖的所以速度會(huì)很快,但是他在硬件層面是有鎖的,他會(huì)在硬件的拉鏈散列表中加鎖。
如果有多個(gè)線程同時(shí)hash到了該索引我們的CAS也能保證只有一個(gè)線程能添加成功其他的線程其實(shí)是走分段加鎖的過程。
4.4 分段加鎖策略
需要大家特別注意的是synchronized (f)鎖了一個(gè)f對(duì)象,那么這個(gè)f對(duì)象是什么呢?其實(shí)就是我們高效尋址算法的到的一個(gè)下標(biāo)中存儲(chǔ)的對(duì)象,注意他鎖的是我們尋址到的這個(gè)對(duì)象并沒有鎖這個(gè)數(shù)組,所以我們當(dāng)前的鎖一共有多少把呢?就看你的size有多少了默認(rèn)是16那么就會(huì)有16把鎖可以來并發(fā)的修改,這個(gè)其實(shí)就是JDK1.8的分段鎖拉,他比1.7鎖的粒度更細(xì)那么并發(fā)的效果就會(huì)更好。
五、無(wú)鎖獲取
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
以上代碼中的內(nèi)容幾乎我們都講過了,所以就不在贅述,但是值得一說的是獲取的過程中并沒有對(duì)數(shù)組或者某一個(gè)Node元素添加鎖,獲取是無(wú)鎖的所有性能高。
還有一個(gè)問題需要注意的是獲取是無(wú)鎖的那么他如果出現(xiàn)多線程修改或者寫入的時(shí)候他就有可能會(huì)出現(xiàn)可見性的問題,因?yàn)槊恳粋€(gè)線程都有自己的工作內(nèi)存,那么ConcurrentHashMap是如何解決可見性的問題呢?
transient volatile Node<K,V>[] table;
從變量的申明中我們可以看到存儲(chǔ)數(shù)據(jù)的table其實(shí)是添加了volatile關(guān)鍵字的,所以其他線程修改了以后我們要求其他的線程把數(shù)據(jù)刷新主內(nèi)存從而保證數(shù)據(jù)的可見性。
五、總結(jié):
1. JDK1.7 使用分段鎖實(shí)現(xiàn)
2. JDK1.8 使用CAS+synchronized+volatile 的具體實(shí)現(xiàn)
3. put方法的復(fù)雜實(shí)現(xiàn)過程
4. get方法的無(wú)鎖實(shí)現(xiàn)尤其是volatile關(guān)鍵字的使用