深入 AQS 源碼,搞懂核心思想

原文出自:公眾號 郭兒的跋涉

原文連結:https://mp.weixin.qq.com/s/2Ltx9LAbIcWvwk_jvmSRcA

本文重點

深入 AQS 源碼,搞懂核心思想

前情提要

之前文章中寫到了 JDK 中 synchronized 關鍵字可以實現同步鎖,並且詳細分析了底層的實現原理。

Synchronized 原理及鎖升級分析

雖然 synchronized 在性能上不再被人詬病,但是在實際使用中仍然缺乏一定的靈活性。

比如在一些場景中需要去嘗試獲取鎖,如果失敗則不再進行等待,又或者設置一定的等待時間,超時後就放棄等待。

正文開始,Reentrantlock 介紹

java.util.concurrent.locks 包提供了多種鎖機制的實現,本文暫且以 ReentrantLock為例,探究 Java 是如何實現同步鎖的。

ReentrantLock,先看名字,reentrant——可重入的,表示持鎖線程可以多次加鎖。

使用案例如下:

ReentrantLock lock = new ReentrantLock(true);
lock.lock();
try {
//do something
} finally {
lock.unlock();
}

AQS

ReentrantLock 底層是由 AQS 框架實現的,並且 java.util.concurrent.locks 提供的多種同步鎖都是 AQS 的子類。

深入 AQS 源碼,搞懂核心思想

AQS,先看名字,AbstractQueuedSynchronizer,抽象的隊列式同步器。AQS 是 Java 中同步鎖實現的基石,由 Java 之神 Doug Lea 操刀。

深入 AQS 源碼,搞懂核心思想

AbstractQueuedSynchronizer 的核心思想是提供了一個同步隊列,將未獲取到鎖的線程阻塞排隊。

提供的關鍵方法如下,此處用到了模板模式,方法的具體實現由子類來提供。

// 獲取鎖
public final void acquire(int arg) {
}
// 釋放鎖
public final boolean release(int arg) {
}
// 嘗試獲取鎖
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 嘗試釋放鎖
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

AQS 中的隊列由 node 組成的雙端列表實現。

static final class Node {
// 標識共享鎖
static final Node SHARED = new Node();
//標識獨占鎖
static final Node EXCLUSIVE = null;
// 已取消
static final int CANCELLED =  1;
//後繼節點在等待當前節點喚醒
static final int SIGNAL    = -1;
//等待在 condition 上
static final int CONDITION = -2;
//共享模式下,喚醒後繼節點
static final int PROPAGATE = -3;
//狀態,默認 0
volatile int waitStatus;
//前驅節點
volatile Node prev;
//後繼節點
volatile Node next;
volatile thread thread;
Node nextWaiter;
}
/**
// 頭結點
private transient volatile Node head;
// 尾結點
private transient volatile Node tail;
// 同步狀態
private volatile int state;

ReetrantLock 原理

1. 創建公平鎖

ReentrantLock lock = new ReentrantLock(true) ,參數 true 標識創建的是公平鎖。

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

2、執行 lock()

final void lock() {
acquire(1);
}

acquire() 由 AQS 提供,並且不可重寫,源碼如下。

public final void acquire(int arg) {
// 1. 嘗試獲取鎖
if (!tryAcquire(arg) &&
// 2. 如果未獲取到, 則加入隊列
//addWaiter,創建節點,並設置下一節點為null,設置為尾結點
//acquireQueued,將當前節點加入隊列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//設置線程中斷標誌
selfInterrupt();
}

嘗試獲取鎖,tryAccquire() 由子類根據自己的策略實現

如果獲取失敗,則執行 addWaiter(),將當前線程封裝成一個 Node, 設置為尾結點

執行 acquireQueued(),將當前節點加入到同步隊列

tryAccquire

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// state = 0 代表沒有線程占用
if (c == 0) {
// hasQueuedPredecessors 返回是否需要排隊,需要排隊則返回 true
if (!hasQueuedPredecessors() &&
//不需要排隊,則 CAS 設置 state = 1
compareAndSetState(0, acquires)) {
// 設置成功,則搶到鎖,設置持鎖線程為當前線程
setExclusiveOwnerThread(current);
return true;
}
}
// 如果被是當前線程占用,則記錄重入次數
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

1、公平鎖實現的 tryAcquire,首先獲取同步狀態 state,不為 0 代表有線程持鎖,則判斷是否是當前線程,如果是則 state + 1, 因此是可重入的。

2、如果 state = 1,則檢查是否需要排隊 hasQueuedPredecessors,如果需要排隊則不繼續判斷,走加入隊列的邏輯。

3、如果不需要排隊,通過 CAS 設置同步狀態 state = 1, 設置失敗則去排隊,設置成功則設置持鎖線程為當前線程。

addWaiter()

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 將當前節點設置為尾節點
Node pred = tail;
if (pred != null) {
//設置節點上一節點為尾結點
node.prev = pred;
//CAS 操作,將尾結點設置為當前節點
if (compareAndSetTail(pred, node)) {
//設置之前尾結點下一節點,指向當前尾結點, 返回尾結點
pred.next = node;
return node;
}
}
//如果尾結點是null, 或者 CAS 操作失敗,進行自旋 enq() 加入尾結點
enq(node);
return node;
}
private Node enq(final Node node) {
//自旋
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//尾節點是null,則 CAS 初始化隊列的頭節點 head,頭結點是空節點, tail = head
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 設置當前節點前一節點為 tail
node.prev = t;
// CAS 操作設置當前節點為 尾結點
if (compareAndSetTail(t, node)) {
// 設置之前尾結點指向當前節點
t.next = node;
return t;
}
}
}
}

1、需要排隊的,先將線程封裝成一個 Node,如果尾結點為空,則說明隊列還未初始化,則通過 enq(node) 進行初始化操作。

2、如果尾結點不為空,則通過 CAS 嘗試將當前節點設置為尾結點,返回當前節點。

3、如果 CAS 執行失敗,則通過 enq(node) 進行自旋嘗試加入隊尾。

acquireQueued()

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// p 是前一節點
final Node p = node.predecessor();
// 如果 p 是頭節點, 再掙扎一次嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
//獲取成功,設置當前節點為頭節點
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted; // 返回false, 獲取鎖成功,沒有進入隊列
}
// 檢查是否需要線程阻塞等待,如果前一個不是待喚醒,則自旋
// 如果前一個待喚醒,則當前線程也阻塞等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前一個節點等待被喚醒
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
// 前一節點已被取消,跳過,再向前找節點
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 默認值是0,設置為待喚醒
// CAS 設置前一節點等待喚醒
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 阻塞線程
LockSupport.park(this);
return Thread.interrupted();
}

1、通過 acquireQueued 將當前節點加入隊尾,並設置阻塞。自旋,判斷如果當前節點的前驅節點。是頭節點(head 節點不排隊,只記錄狀態,head 的後驅節點才是真正第一個排隊的),則再次嘗試 tryAcquire() 獲取鎖。

2、可以看到自旋的跳出條件是當前節點是隊列中第一個,並且獲取鎖。

3、如果一直自旋,則會消耗 CPU 資源,因此使用 shouldParkAfterFailedAcquire 判斷是否需要將當前線程阻塞,如果是則通過 parkAndCheckInterrupt 阻塞線程的運行。

4、LockSupport.park() 是通過 native 方法 UNSAFE.park() 實現的線程阻塞。

3、執行 unlock()

釋放鎖的邏輯相對比較簡單。

public void unlock() {
sync.release(1);
}

AQS 的 release() 方法。

public final boolean release(int arg) {
//釋放鎖
if (tryRelease(arg)) {
Node h = head;
//隊列元素需要喚醒
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

tryRelease()

protected final boolean tryRelease(int releases) {
//多次重入需要多次釋放
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//等於0代表釋放,設置鎖持有線程為null
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//設置狀態為0
setState(c);
return free;
}

unparkSuccessor()

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// CAS 操作設置等待狀態為 0-初始態
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//取下一節點
Node s = node.next;
//如果下一節點狀態為取消,則從尾結點開始循環,找到最前面未取消的節點
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// unpark 喚醒線程
if (s != null)
LockSupport.unpark(s.thread);
}

1、unlock() 方法調用了 AQS 的 release()方法,執行 tryRelease() 釋放鎖。

2、tryRelease() 由子類實現,reentrantLock 釋放邏輯是將 state – 1, 直到 state = 0 才認為是釋放完畢,多次重入需要多次釋放。

3、通過 AQS 的 unparkSuccessor() 喚醒下一個等待的線程。

4、使用 LockSupport.unpark(s.thread) 喚醒線程。

公平與非公平

實際上公平與非公平的區別如下:

1、公平鎖的流程是,當前有線程持鎖,則新來的線程乖乖去排隊。

2、非公平鎖的流程是,新來的線程先去搶一把試試,沒搶到再去排隊。

非公平鎖的加鎖邏輯如下。

final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

總結 AQS 的核心

AQS 中使用了幾個核心的操作來進行同步鎖的控制。

深入 AQS 源碼,搞懂核心思想

總結 ReentrantLock 流程

深入 AQS 源碼,搞懂核心思想

這還沒完,想要徹底搞定每一步的細節,還得去翻看源碼,細細品味。

來源:kknews深入 AQS 源碼,搞懂核心思想