一,获取锁 acquire(int)
此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是 lock() 的语义,当然不仅仅只限于 lock()。获取到资源后,线程就可以去执行其临界区代码了。下面是 acquire() 的源码:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
函数流程如下:
1,tryAcquire(arg) 尝试获取锁,获锁成功则流程结束。(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次)
2,获锁失败,addWaiter(Node.EXCLUSIVE) 将线程加入队列尾部,并标记为独占锁。
3,acquireQueued() 使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
4,如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断 selfInterrupt(),将中断补上。
下面具体分析下:
1.1 tryAcquire(int)
此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。这也正是tryLock()的语义,还是那句话,当然不仅仅只限于tryLock()。如下是tryAcquire()的源码:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现。AQS这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过state的get/set/CAS)!!!至于能不能重入,能不能加塞,那就看具体的自定义同步器怎么去设计了!!!当然,自定义同步器在进行资源访问时要考虑线程安全的影响。
这里之所以没有定义成 abstract,是因为独占模式下只用实现 tryAcquire-tryRelease,而共享模式下只用实现 tryAcquireShared-tryReleaseShared。如果都定义成 abstract,那么每个模式也要去实现另一模式下的接口。
1.2 .1 addWaiter(Node)
此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。
还是上源码吧:
private Node addWaiter(Node mode) {
//以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//尝试快速方式直接放到队尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失败则通过enq入队。
enq(node);
return node;
}
1.2 .2 end(Node)
此方法用于尝试快速方式直接放到队尾失败情况下,将node加入队尾。源码如下:
private Node enq(final Node node) {
// CAS"自旋",直到成功加入队尾
for (;;) {
Node t = tail;
if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
if (compareAndSetHead(new Node()))
tail = head;
} else { //正常流程,放入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
如果你看过AtomicInteger.getAndIncrement()函数源码,那么相信你一眼便看出这段代码的精华。CAS自旋volatile变量,是一种很经典的用法。
1.3.1 acquireQueued(Node, int)
通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。应该能想到该线程下一部该干什么了吧:进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。 acquireQueued() 就是干这件事:在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回。这个函数非常关键,还是上源码吧:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//标记是否成功拿到资源
try {
boolean interrupted = false;//标记等待过程中是否被中断过
//又是一个“自旋”!
for (;;) {
final Node p = node.predecessor(); //拿到前驱
// 如果前继节点为头结点,说明排队马上排到自己了,可以尝试获取资源,若获取资源成功,则执行下述操作
if (p == head && tryAcquire(arg)) {
setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
// 说明前继节点已经释放掉资源了,将其next置空,以方便虚拟机回收掉该前继节点
p.next = null;
failed = false; // 成功获取资源
return interrupted;//返回等待过程中是否被中断过
}
//如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; //如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
}
} finally {
if (failed) // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
cancelAcquire(node);
}
}
到这里了,我们先不急着总结 acquireQueued() 的函数流程,先看看 shouldParkAfterFailedAcquire() 和 parkAndCheckInterrupt()
具体干些什么。
1.3.2 shouldParkAfterFailedAcquire(Node, Node)
此方法主要用于检查状态,看看自己是否真的可以去休息了(进入waiting状态),万一队列前边的线程都放弃了只是瞎站着,那也说不定!
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//拿到前驱的状态
if (ws == Node.SIGNAL)
//如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
return true;
if (ws > 0) {
/*
* 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
* 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
*/
do {
// 等同于 node.prev = (pred = pred.prev)
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息
,需要去找个安心的休息点(node.prev = pred = pred.prev;),同时可以再尝试下看有没有机会轮到自己拿号。
1.3.3 parkAndCheckInterrupt()
如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
park() 会让当前线程进入 waiting 状态。在此状态下,有两种途径可以唤醒该线程:① 被 unpark();②被 interrupt()
。需要注意的是,Thread.interrupted() 会清除当前线程的中断标记位。
1.3.4 acquireQueued() 小结
总结下该函数的具体流程:
1,结点进入队尾后,检查状态,找到安全休息点;
2,调用 park() 进入 waiting 状态,等待 unpark() 或 interrupt() 唤醒自己;
3,被唤醒后,看自己是不是是否能拿到锁。如果拿到,head 指向当前结点,并返回从入队到拿到锁的整个过程中是否被中断过;如果没拿到,继续流程。
1.4 acquire() 小结
接下来再回到 acquire()!再贴上它的源码吧:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1,调用自定义同步器的 tryAcquire() 尝试直接去获取资源,如果成功则直接返回;
2,没成功,则 addWaiter() 将该线程加入等待队列的尾部,并标记为独占模式;
3,acquireQueued() 使线程在等待队列中休息,有机会时(轮到自己,会被 unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回 true,否则返回 false。
4,如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断 selfInterrupt(),将中断补上。
由于此函数是重中之重,我再用流程图总结一下:
至此,acquire()的流程终于算是告一段落了。这也就是 ReentrantLock.lock() 的流程。
二,释放锁 release(int)
此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()。下面是release()的源码:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;//找到头结点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒等待队列里的下一个线程
return true;
}
return false;
}
逻辑并不复杂。它调用tryRelease()来释放资源。有一点需要注意的是,它是根据 tryRelease() 的返回值来判断该线程是否已经完成释放掉资源了
!所以自定义同步器在设计tryRelease()的时候要明确这一点!!
2.1 tryRelease(int)
此方法尝试去释放指定量的资源。下面是tryRelease()的源码:
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
跟tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease() 都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可 (state-=arg),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release() 是根据 tryRelease() 的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回 true,否则返回 false。
2.2 unparkSuccessor(Node)
此方法用于唤醒等待队列中下一个线程。
下面是源码:
private void unparkSuccessor(Node node) {
//这里,node一般为当前线程所在的结点。
int ws = node.waitStatus;
if (ws < 0) //置零当前线程所在的结点状态,允许失败。
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; //找到下一个需要唤醒的结点s
if (s == null || s.waitStatus > 0) { //如果为空或已取消
s = null;
// 从后向前找,找到等待队列最前面的一个有效节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) //从这里可以看出,<=0 的结点,都是还有效的结点。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒
}
这个函数并不复杂:用 unpark() 唤醒 head 节点 A;若 A节点无效,寻找 head 节点的下一节点 B ;若 B 为空或者是取消状态,则**从 tail 节点往前遍历找到处于等待队列最前面的一个正常阻塞状态的结点。**
这里我们也用 s 来表示被唤醒的节点吧。此时,再和 acquireQueued() 联系起来,s 被唤醒后,进入 if (p == head && tryAcquire(arg)) 的判断(即使 p != head 也没关系,它会再进入 shouldParkAfterFailedAcquire() 寻找一个安全点。这里既然 s 已经是等待队列中最前边的那个未放弃线程了,那么通过 shouldParkAfterFailedAcquire() 的调整,s 也必然会跑到 head 的 next 结点,下一次自旋 p==head 就成立啦),然后s把自己设置成 head标杆结点,表示自己已经获取到资源了,acquire() 也返回了!!
2.3 小结
release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
2.4 注意
如果获取锁的线程在release时异常了,没有unpark队列中的其他结点,这时队列中的其他结点会怎么办?是不是没法再被唤醒了?
答案是yes,这时,队列中等待锁的线程将永远处于park状态,无法再被唤醒!!!但是我们再回头想想,获取锁的线程在什么情形下会release抛出异常呢??
1,线程突然死掉了?可以通过thread.stop来停止线程的执行,但该函数的执行条件要严苛的多,而且函数注明是非线程安全的,已经标明Deprecated;
2,线程被interupt了?线程在运行态是不响应中断的,所以也不会抛出异常;
3,release代码有bug,抛出异常了?目前来看,Doug Lea的release方法还是比较健壮的,没有看出能引发异常的情形(如果有,恐怕早被用户吐槽了)。除非自己写的tryRelease()有bug,那就没啥说的,自己写的bug只能自己含着泪去承受了。
评论区