一,概述
谈到并发,不得不谈 ReentrantLock;而谈到 ReentrantLock,不得不谈 AbstractQueuedSynchronizer(AQS)! 类如其名,抽象的队列式的同步器,AQS 定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的 ReentrantLock/Semaphore/CountDownLatch…。
二,框架
AQS 维护了一个 volatile int state
(代表共享资源) 和一个 FIFO 线程等待队列(多线程争用资源被阻塞时进入此队列)。这里 volatile 是核心关键词,state 的访问方式有三种:
getState()
setState()
compareAndSetState()
AQS 中几个可见性变量
private volatile int state;
private transient volatile Node head;
private transient volatile Node tail;
AQS 中对这些 volatile 可见性变量的操作一般是通过 CAS 操作,底层通过 Unsafe 类实现,代码:
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
AQS定义两种资源共享方式:Exclusive
(独占,只有一个线程能执行,如 ReentrantLock)和 Share
(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
# 该线程是否正在独占资源。只有用到condition才需要去实现它
isHeldExclusively()
# 独占方式。尝试获取资源,成功则返回true,失败则返回false
tryAcquire(int)
# 独占方式。尝试释放资源,成功则返回true,失败则返回false
tryRelease(int)
# 共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源
tryAcquireShared(int)
共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false
tryReleaseShared(int)
以 ReentrantLock 为例,state 状态初始化为 0,表示未锁定状态。A线程 lock() 时,会调用 tryAcquire() 独占该锁并将 state +1,此后,其它线程再 tryAcquire() 时就会失败,指定 A 线程释放锁 unlock() 到 state = 0 为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己可以重复获取该锁(state 累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证 state 是能回到 0 的。
再以 CountDownLatch 为例,任务分为 N 个子线程去执行,state 也初始化为 N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后 countDown() 一次,state会 CAS 减 1。等到所有子线程都执行完后(即 state=0 ),会 unpark() 主调用线程,然后主调用线程就会从 await() 函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现 tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared 中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如 ReentrantReadWriteLock。
三,源码
Node 结点是对每一个等待获取资源的线程的封装,其包含了需要同步的线程本身及其等待状态,如是否被阻塞、是否等待唤醒、是否已经被取消等。
结点状态 waitStatus 则表示当前 Node 结点的等待状态,共有5种取值 CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。
CANCELLED(1)
# 表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化
SIGNAL(-1)
# 表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为 SIGNAL
CONDITION(-2)
# 表示结点等待在 Condition 上,当其他线程调用了 Condition 的 signal() 方法后,CONDITION 状态的结点将从等待队列转移到同步队列中,等待获取同步锁
PROPAGATE(-3)
# 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点
0
# 新结点入队时的默认状态
注意,负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用 >0、< 0 来判断结点的状态是否正常。
评论区