AbstractQueuedSynchronizer原理解析

AbstractQueuedSynchronizer简称AQS是Java大部分Lock、Semaphore、CountDownLatch等公共依赖框架,实现依赖于先进先出(FIFO)等待队列的阻塞锁。读懂它的代码原理有利我们去理解Java Lock衍生类原理,帮组我们开发自定义Lock。

主要原理

线程队列.png
由上图所示,在队列内的元素都为执行线程,在队列头部head就是获取到独占锁的执行线程,其他线程都在队列中排队沉睡中,想要获取锁的线程会从tail中加入队列。当head释放锁了,会将next线程唤醒,去获取锁,成功获取后再将head指向next线程。这里主要简单说一下基本原理,具体怎么操作我们一起进入代码讲解吧。

Node内部类属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static final class Node {
/** 标记当前节点共享锁标记 */
static final Node SHARED = new Node();
/** 标记当前节点独占锁标记 */
static final Node EXCLUSIVE = null;

/** 中断或者超时退出锁竞争 */
static final int CANCELLED = 1;
/** 锁即将被释放,这时需要唤醒下一个获取线程同时将head节点指向下一个节点 */
static final int SIGNAL = -1;
/** 同步队列condition */
static final int CONDITION = -2;
//共享锁
static final int PROPAGATE = -3;

//线程等待状态 分别对应上面4种状态
volatile int waitStatus;
//前置结点引用
volatile Node prev;
//后置结点引用
volatile Node next;

volatile Thread thread;
//下一个需要唤醒结点
Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}

Node() {}

/** Constructor used by addWaiter. */
Node(Node nextWaiter) {
this.nextWaiter = nextWaiter;
THREAD.set(this, Thread.currentThread());
}

/** Constructor used by addConditionWaiter. */
Node(int waitStatus) {
WAITSTATUS.set(this, waitStatus);
THREAD.set(this, Thread.currentThread());
}

AbstractQueuedSynchronizer 内部属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

/**
* 队列头
*/
private transient volatile Node head;

/**
* 队列尾
*/
private transient volatile Node tail;

/**
* 同步状态,根据设置这个值来获取锁,默认线程都是从0开始
*/
private volatile int state;

protected final int getState() {
return state;
}

protected final void setState(int newState) {
state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}

进入ReentrantLock,获取锁的时候,调用AQS那个方法

1
2
3
public void lock() {
sync.acquire(1);
}

Sync是ReentrantLock内部类,继承自AbstractQueuedSynchronizer,用于实现公平锁和非公平锁。

acquire

1
2
3
4
5
6
public final void acquire(int arg) {
//尝试去获取锁
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //获取锁失败先初始化队列再排队
selfInterrupt(); //设置当前线程中断
}

主要流程先获取锁,如果失败了进入队列中排队。

acquireQueued

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) { //自旋
final Node p = node.predecessor(); //前置结点
if (p == head && tryAcquire(arg)) {//前置节点为head,下一个即将获得锁 尝试去获取锁
setHead(node); //获取成功了,将head 指向node
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node)) //判断前置节点waitStatus 是不是 SIGNAL,如果不是不会挂起线程
interrupted |= parkAndCheckInterrupt(); //挂起线程,当线程被唤醒将返回线程中断状态,并且清理中断
}
} catch (Throwable t) {
cancelAcquire(node); //唤醒线程,移除出堵塞队列
if (interrupted)
selfInterrupt();
throw t;
}
}

acquireQueued方法要么获取锁成功跳出循环,要么出现异常进入异常处理逻辑。

shouldParkAfterFailedAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //线程准备被唤醒
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { //大于0都是CANCELLED 状态 线程要退出锁竞争
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node; //删除 ws = CANCELLED的 队列节点
} else { // 0 或者PROPAGATE都改成
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}

这个方法就两个作用,比较waitStatus或者设置,将node结点向前移动。
当p=head 相等,有可能是head还是空,队列还没有初始化成功,这个时候才一次去获取锁。还有就是node就head后置结点,或者node重入尝试获取锁,获取锁成功了,重新设置头结点。

cancelAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;

node.thread = null;

Node pred = node.prev;
while (pred.waitStatus > 0) //跳过状态大于0线程
node.prev = pred = pred.prev;

Node predNext = pred.next;

//将当前结点设置成CANCELLED状态,退出锁竞争
node.waitStatus = Node.CANCELLED;

//如果node是tail,直接将前置结点设置成tail ,
if (node == tail && compareAndSetTail(node, pred)) {
pred.compareAndSetNext(predNext, null); //将tail.next = null
} else {
int ws;
// 向判断前置结点不能是head,在判断状态是SIGNAL,再判断node后继结点状态是否合法
// 所以条件都成立,将前后两个节结点关联起来,相当于直接删除自己。
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
pred.compareAndSetNext(predNext, next);//前后结合
} else {
unparkSuccessor(node); //唤醒后置结点
}

node.next = node; // help GC
}
}

这个方法主要任务就寻找状态合法前置结点,设置线程状态CANCELLED,退出线程锁竞争,找到后置线程,将前后指引关联起来,从队列中删除自己。但是有可能前置节点head,这时就需要唤醒后置节点,删除状态为CANCELLED节点,获取锁

unparkSuccessor

看下怎么唤醒线程的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) //因为要唤醒下一个线程,应该要清除SIGNAL 状态
node.compareAndSetWaitStatus(ws, 0);

Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev) //从后往前遍历,找到最靠近结点
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread); //唤醒线程
}

addWaiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node addWaiter(Node mode) {
Node node = new Node(mode);

for (;;) {
Node oldTail = tail;
if (oldTail != null) {//tail 已经存在,将线程加入tail
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue(); //初始化队列head,tail 属性
}
}
}

根据给定的模式去创建排队节点,mode只要分成两个模式Node.EXCLUSIVE排他锁和Node.SHARED共享锁。
tzIwuD.png
结合上面三个方法分析,对获取锁失败有了些简单了解。线程使用tryAcquire去获取锁,如果获取失败了,进入排队方法,此时先判断队列tail存在,已经存在直接尾插法插入,否则先初始化队列tail和head。这里知道队列初始化不是由先获取到锁去初始化的,而是由竞争失败的线程去创建队列,这样做是为了性能吧。acquire会根据前置节点为head的情况下不断去获取锁,设置一个不间断的锁竞争趋势。然后将当前线程挂起,当线程被唤醒,第一个要去做的就是去获取锁成功,跳出acquire循环。返回线程中断状态,只有当线程中断状态存在,再次调用线程中断。
本篇开头线程队列图片在有些情况下,并不是图片显示情况那样的。在队列刚开始初始化的时候,head节点并不是获取锁的节点,head只是随机创建Node对象,和tail都是同一个对象。当有线程从tail.next关联,进入队列,也简介和head建立起关系了,只要head最靠近的结点获取到锁后,将head指向自己才会往开篇队列图片方向走。

分析tryAcquire

tryAcquire是获取锁唯一实现,主要有子类实现。主要因为AQS是支持独占锁和共享锁、是否支持重入,更适合由子类去实现获取锁逻辑。进入ReentranLock了解公平锁和非公平锁如何实现tryAcquire,这两种情况一起分析下。

非公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

abstract static class Sync extends AbstractQueuedSynchronizer {
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//state 第一次竞争锁
if (compareAndSetState(0, acquires)) { //交换比较state成功,算是获取锁成功了
setExclusiveOwnerThread(current);//设置属性变量
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //锁重入,只对state += acquires 处理
int nextc = c + acquires;
if (nextc < 0) // int 越界
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

非常简单,使用CAS(交换比较)最先设置成功线程,就算获取成功,在判断获取锁线程是否是占有锁线程,支持重入锁。

公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static final class FairSync extends Sync {
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

主要区别多了一个hasQueuedPredecessors判断,进入方法分析下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final boolean hasQueuedPredecessors() {
Node h, s;
if ((h = head) != null) {//队列已经初始化
//大于0就是CANCELLED ,s会被移除出队列,下一个节点不可以了,获取下下个节点
if ((s = h.next) == null || s.waitStatus > 0) {
s = null; // traverse in case of concurrent cancellation
for (Node p = tail; p != h && p != null; p = p.prev) { //从尾链一直向前遍历
if (p.waitStatus <= 0) //waitStatus 状态正常
s = p;
}
}
if (s != null && s.thread != Thread.currentThread()) //获取锁的线程只要不是下一个即将去获取锁的线程
return true;
}
return false;
}

先判断head节点是否初始化了,如果没有直接返回false。已经存在直接获取head节点后继节点,只有判断节点线程和当前线程不相等就返回true。这是为什么呢? 想一下锁开始释放的时候,唤醒下一个节点去获取锁,这时候有一个线程也去获取锁,有可能抢占了队列中排队节点获取到锁,相当于插队这是”不公平”的。
公共和不公平主要区别就是在获取锁的时候先判断队列中是否有排队线程,如果存在,直接获取失败,入列排队,强制保证排队最长队列最先获取到锁的原则。公平锁会比非公平锁多一点点性能消耗,但是影响不是很大的,在平常开发上使用公平锁也是一个不错选择,毕竟大家都是选择追求公平的😝。

释放锁

ReentranLock.unlock()的代码

1
2
3
public void unlock() {
sync.release(1);
}

release

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) { //这个需要子类去实现
Node h = head;
if (h != null &&stat h.waitStatus != 0) //状态不能为0
unparkSuccessor(h); //唤醒下一个队列线程
return true;
}
return false;
}

h.waitStatus = 0 就是状态还是默认状态,我们知道head状态是由shouldParkAfterFailedAcquire修改的,这时候线程还在自旋获取锁,不需要唤醒。

tryRelease

ReentranLock的tryRelease 是公平锁和非公平锁的释放锁的实现。

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread()) //释放锁线程必须是拥有者线程,不然直接抛出异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //state 必须等于0了,才算真正释放锁,对应了锁重入 state计算
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

每次调用tryRelease(),只要不是非法线程都可以让state - releases,又不会唤醒线程,只要state=0了,才真正释放锁,设置占有线程为null。唤醒队列中等待线程。
一个ReentranLock获取锁释放锁流程就已经走完了,但是AbstractQueuedSynchronizer中还有很多方法,本着解析这类来的,把其他功能也分析一下吧。

lockInterruptibly

lockInterruptibly在获取锁的时候或者入列排队等待,线程可以被中断强制退出锁竞争。这个是ReentrantLock才有的功能,synchronized关键字并不支持获取锁中断。
ReentrantLock 代码

1
2
3
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

acquireInterruptibly

看一下acquireInterruptibly是怎么样的

1
2
3
4
5
6
7
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) //已经已经中断了,并且清理中断状态
throw new InterruptedException();
if (!tryAcquire(arg)) //获取锁失败
doAcquireInterruptibly(arg); //失败 入列挂起等待
}

看下入队线程怎么支持中断的

doAcquireInterruptibly

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE); //加入队列当前结点
try {
for (;;) { //自旋
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return;
}
// 先检查前置结点状态,移除无效线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //挂起线程 返回线程中断状态
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}

获取锁流程和lock方法基本上一样,在获取锁之前会判断线程中断,并且去处理它。在入队的线程,线程已经被挂起是不能处理中断的,只要当线程被唤醒了,直接抛出异常,退出锁竞争。

获取锁超时

ReentrantLock 有个方法可以设置在指定时间内去获取锁,避免等待获取锁的时候,线程被一直堵塞下去。通过设置超时时间,在锁获取失败了可以让调用者自己去处理。直接进入代码讲解

1
2
3
4
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

看下 AQS怎么实现

1
2
3
4
5
6
7
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) || //获取失败了 进入下面方法
doAcquireNanos(arg, nanosTimeout);
}

doAcquireNanos

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout; //超时时间戳
final Node node = addWaiter(Node.EXCLUSIVE); //入队
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) { //已经超时
cancelAcquire(node); // 退出队列
return false;
}
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) //大于1000 ns 线程才需要被挂起 不然时间太短了,还没有挂起就已经结束了
LockSupport.parkNanos(this, nanosTimeout); //挂起线程,指定时间会比唤醒
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}

根据百度上描述纳秒,计算机执行一个执行一道指令(如将两数相加)约需2至4纳秒,太小时间间隔就没有意义了。这个方法比正常方法多了超时判断和超时唤醒线程,其他都一样。

ConditionObject

ConditionObject是AbstractQueuedSynchronizer内部类,实现Condition接口主要功能堵塞线程和唤醒堵塞,有点类似wait、notfiy、notifyAll,一般都是用在同步队列的生产者和消费者的线程堵塞唤醒。先解析Condition 接口每个方法含义,在具体分析方法实现。

Condition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface Condition {

// 使当前线程等待,直到收到信号或者中断
void await() throws InterruptedException;

//使当前线程等待,直到收到信号
void awaitUninterruptibly();

// 使当前线程等待,直到收到信号或者超过指定时间或者发出中断
long awaitNanos(long nanosTimeout) throws InterruptedException;

// 同上
boolean await(long time, TimeUnit unit) throws InterruptedException;

//当前线程等待,直到收到信息或者中断或者超过指定时间
boolean awaitUntil(Date deadline) throws InterruptedException;

// 唤醒单个等待线程
void signal();

//唤醒全部等待线程
void signalAll();

在解析实现类之前,要先知道ConditionObject内部属性

1
2
3
4
5
6
7
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

public ConditionObject() { }

内部属性就两个,头结点、尾结点,下面开始分析实现方法。

await

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final void await() throws InterruptedException {
if (Thread.interrupted()) //返回线程中断状态,并且清理中断信号
throw new InterruptedException();
Node node = addConditionWaiter(); //线程进入等待队列中
int savedState = fullyRelease(node); //执行释放锁功能,返回state 值
int interruptMode = 0;
// //node 是否在队列中
while (!isOnSyncQueue(node)) { //不在队列 挂起线程
LockSupport.park(this);
//当线程被唤醒 清理中断状态 重新加入等待队列中
//当线程被唤醒,判断是否存在中断信号,并且返回给interruptMode
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//在队中自旋获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters(); //清除线程状态不为CONDITION
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
  1. 先判断线程是否存在中断,中断则直接抛出异常。
  2. 将Node加入有ConditionObject 维护的单向链表,这个列表主要用在队列唤醒,和获取锁的队列并不冲突。
  3. 释放锁,刚开始有点想不明白这里的,这个要结合同步队列来理解。当线程被挂起的时候必须要释放锁,不然变成死锁了。生产者获取不到锁,不能插入数据,就无法唤醒消费者。
  4. while 循环处理,只要node已经加入线程队列参与锁的竞争了,才会退出循环,或者先将当前线程挂起。当线程被唤醒了,判断线程是否因为中断而被唤醒,如果是就直接跳出while 循环。
  5. 因为已经加入队列,现在可以去获取锁 。
  6. 此时node结点已经获取到锁了waitStatus 已经产生变化了,需要清除单向链表中状态不合法结点包括它自己。
  7. 当interruptMode 不等于0,则说明有中断需要处理,需要调用者自己去处理。

addConditionWaiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private Node addConditionWaiter() {
if (!isHeldExclusively()) //不是当前锁拥有者
throw new IllegalMonitorStateException();
Node t = lastWaiter;
// 如果lastWaiter 不是CONDITION状态了,移除出队列,CONDITION是同步队列专属状态
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters(); //将状态不等于CONDITION 移除nextWaiter 队列
t = lastWaiter;
}

Node node = new Node(Node.CONDITION); //创建结点,设置waitStatus 为CONDITION

if (t == null) //头节点还没初始化
firstWaiter = node;
else
t.nextWaiter = node; //node不是头就是尾结点
lastWaiter = node;
return node;
}

nextWaiter队列.png
在ConditioinObject中会维护一个nextWaiter连起来的单向condition链表,这个链表主要特性就是,所以Node.waitStatus 必须是Node.CONDITION,会将链表头部结点、尾部结点放入lastWaiter、firstWaiter结点中。

unlinkCancelledWaiters

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) { //从头开始遍历
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null; //切断 t 和后面结点关联,下面方便删除
if (trail == null) //将头节点删除,下一个结点就是头结点
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}

遍历整个condition链表,将Node.waitStatus != Node.CONDITION删除。

isOnSyncQueue

1
2
3
4
5
6
7
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null) //node 刚释放锁 还没进一次进入队列
return false;
if (node.next != null) //前置结点不为空,肯定在队列中
return true;
return findNodeFromTail(node); //从tail 向前查找node 找到返回true
}

checkInterruptWhileWaiting

1
2
3
4
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}

检查线程中断,并且清理它。没有中断直接返回0,不会执行transferAfterCancelledWait。中断了就会执行执行transferAfterCancelledWait并且返回一个中断结构。

  • REINTERRUPT 在等待退出时中断退出
  • THROW_IE 抛出一个InterruptedException 异常退出

    transferAfterCancelledWait

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    final boolean transferAfterCancelledWait(Node node) {
    if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) { //waitStatus CONDITION => 0 设置成功
    enq(node); //加入队列
    return true;
    }

    while (!isOnSyncQueue(node)) //node 不在队列中
    Thread.yield(); //线程让出执行权 等待node进入队列后 跳出自旋
    return false;
    }

根据之前获取锁的代码,加入队列结点waitStatus 都是默认值0,只要aitStatus CONDITION => 0 设置成功才能加入队列,参与锁竞争。设置失败就会自旋判断node 是否进入队列中,必须进入队列中才会退出这个方法。

reportInterruptAfterWait

1
2
3
4
5
6
7
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE) // 重新抛出中断异常,交给调用者处理
throw new InterruptedException();xin
else if (interruptMode == REINTERRUPT) //退出时中断退出
selfInterrupt(); //调用线程中断
}

根据不同中断情况,做出不同处理。

signal

1
2
3
4
5
6
7
public final void signal() {
if (!isHeldExclusively()) //必须是独占锁线程执行
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null) //头结点不为空,说明condition队列存在,可以去唤醒
doSignal(first); //唤醒线程
}

看下唤醒线程如何实现的

doSignal

1
2
3
4
5
6
7
8
9
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null) //下一个结点为空,链表已经到头了
lastWaiter = null;
first.nextWaiter = null;
//这个方法就是主要唤醒逻辑
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

transferForSignal

1
2
3
4
5
6
7
8
9
10
11
12
13
final boolean transferForSignal(Node node) {
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) //加入队列 waitStatus=0
return false;
//加入队列竞争锁
// p 是 node 前置结点
Node p = enq(node);
int ws = p.waitStatus;
// ws 大于0 p会退出队列,可以唤醒后置结点
// p 设置状态失败,p结点可以已经不存在了,这时候应该唤醒线程去获取锁了。
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread); //唤醒node 的 线程
return true; //唤醒成功返回true
}

这里唤醒线程的条件,是前置结点是否可用,只要当前置结点准备退出队列,或者已经被删除了。node被唤醒获取锁。
剩下几个方法,我不打算在写了,内容太多了。有兴趣自己去了解,内容都是差不多重复的。

总结

本篇通过获取锁释放锁原理去解析AbstractQueuedSynchronizer内部源码原理,也分析了公平锁和非公平锁的区别。讲解一些衍生功能的锁,如超时锁,中断锁,算是比较全部解析了AbstractQueuedSynchronizer作为Java ReentrantLock这类同位锁的框架支持,也简单分析了ConditionObject 实现线程协调signal、await方法。这篇文章大部分内容都是我自己思考想出来,如果有那些地方说错,请出来大家一起讨论。