侧边栏壁纸
博主头像
再见理想博主等级

只争朝夕,不负韶华

  • 累计撰写 112 篇文章
  • 累计创建 64 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

AQS源码解读-独占模式下获取锁和释放锁

再见理想
2022-05-29 / 0 评论 / 0 点赞 / 600 阅读 / 3,652 字

一,获取锁 acquire(int)

此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是 lock() 的语义,当然不仅仅只限于 lock()。获取到资源后,线程就可以去执行其临界区代码了。下面是 acquire() 的源码:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

函数流程如下:

1,tryAcquire(arg) 尝试获取锁,获锁成功则流程结束。(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次)

2,获锁失败,addWaiter(Node.EXCLUSIVE) 将线程加入队列尾部,并标记为独占锁。

3,acquireQueued() 使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。

4,如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断 selfInterrupt(),将中断补上。

下面具体分析下:

1.1 tryAcquire(int)

此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。这也正是tryLock()的语义,还是那句话,当然不仅仅只限于tryLock()。如下是tryAcquire()的源码:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现。AQS这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过state的get/set/CAS)!!!至于能不能重入,能不能加塞,那就看具体的自定义同步器怎么去设计了!!!当然,自定义同步器在进行资源访问时要考虑线程安全的影响。

这里之所以没有定义成 abstract,是因为独占模式下只用实现 tryAcquire-tryRelease,而共享模式下只用实现 tryAcquireShared-tryReleaseShared。如果都定义成 abstract,那么每个模式也要去实现另一模式下的接口。

1.2 .1 addWaiter(Node)

此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。还是上源码吧:

private Node addWaiter(Node mode) {
    //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
    Node node = new Node(Thread.currentThread(), mode);

    //尝试快速方式直接放到队尾。
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }

    //上一步失败则通过enq入队。
    enq(node);
    return node;
}

1.2 .2 end(Node)

此方法用于尝试快速方式直接放到队尾失败情况下,将node加入队尾。源码如下:

private Node enq(final Node node) {
    // CAS"自旋",直到成功加入队尾
    for (;;) {
        Node t = tail;
        if (t == null) {   // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {   //正常流程,放入队尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

如果你看过AtomicInteger.getAndIncrement()函数源码,那么相信你一眼便看出这段代码的精华。CAS自旋volatile变量,是一种很经典的用法。

1.3.1 acquireQueued(Node, int)

通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。应该能想到该线程下一部该干什么了吧:进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。 acquireQueued() 就是干这件事:在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回。这个函数非常关键,还是上源码吧:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;//标记是否成功拿到资源
    try {
        boolean interrupted = false;//标记等待过程中是否被中断过

        //又是一个“自旋”!
        for (;;) {
            final Node p = node.predecessor(); //拿到前驱
             // 如果前继节点为头结点,说明排队马上排到自己了,可以尝试获取资源,若获取资源成功,则执行下述操作
            if (p == head && tryAcquire(arg)) {
                setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
				// 说明前继节点已经释放掉资源了,将其next置空,以方便虚拟机回收掉该前继节点
                p.next = null;
                failed = false; // 成功获取资源
                return interrupted;//返回等待过程中是否被中断过
            }

            //如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; //如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
        }
    } finally {
        if (failed) // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
            cancelAcquire(node);
    }
}

到这里了,我们先不急着总结 acquireQueued() 的函数流程,先看看 shouldParkAfterFailedAcquire() 和 parkAndCheckInterrupt() 具体干些什么。

1.3.2 shouldParkAfterFailedAcquire(Node, Node)

此方法主要用于检查状态,看看自己是否真的可以去休息了(进入waiting状态),万一队列前边的线程都放弃了只是瞎站着,那也说不定!

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//拿到前驱的状态
    if (ws == Node.SIGNAL)
        //如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
        return true;
    if (ws > 0) {
        /*
         * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
         * 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
         */
        do {
			// 等同于  node.prev = (pred = pred.prev)
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
         //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点(node.prev = pred = pred.prev;),同时可以再尝试下看有没有机会轮到自己拿号。

1.3.3 parkAndCheckInterrupt()

如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

park() 会让当前线程进入 waiting 状态。在此状态下,有两种途径可以唤醒该线程:① 被 unpark();②被 interrupt()。需要注意的是,Thread.interrupted() 会清除当前线程的中断标记位。

1.3.4 acquireQueued() 小结

总结下该函数的具体流程:

1,结点进入队尾后,检查状态,找到安全休息点;

2,调用 park() 进入 waiting 状态,等待 unpark() 或 interrupt() 唤醒自己;

3,被唤醒后,看自己是不是是否能拿到锁。如果拿到,head 指向当前结点,并返回从入队到拿到锁的整个过程中是否被中断过;如果没拿到,继续流程。

1.4 acquire() 小结

接下来再回到 acquire()!再贴上它的源码吧:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

1,调用自定义同步器的 tryAcquire() 尝试直接去获取资源,如果成功则直接返回;

2,没成功,则 addWaiter() 将该线程加入等待队列的尾部,并标记为独占模式;

3,acquireQueued() 使线程在等待队列中休息,有机会时(轮到自己,会被 unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回 true,否则返回 false。

4,如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断 selfInterrupt(),将中断补上。

由于此函数是重中之重,我再用流程图总结一下:

至此,acquire()的流程终于算是告一段落了。这也就是 ReentrantLock.lock() 的流程。

二,释放锁 release(int)

此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()。下面是release()的源码:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;//找到头结点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//唤醒等待队列里的下一个线程
        return true;
    }
    return false;
}

逻辑并不复杂。它调用tryRelease()来释放资源。有一点需要注意的是,它是根据 tryRelease() 的返回值来判断该线程是否已经完成释放掉资源了!所以自定义同步器在设计tryRelease()的时候要明确这一点!!

2.1 tryRelease(int)

此方法尝试去释放指定量的资源。下面是tryRelease()的源码:

protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

跟tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease() 都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可 (state-=arg),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release() 是根据 tryRelease() 的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回 true,否则返回 false。

2.2 unparkSuccessor(Node)

此方法用于唤醒等待队列中下一个线程。下面是源码:

private void unparkSuccessor(Node node) {
    //这里,node一般为当前线程所在的结点。
    int ws = node.waitStatus;
    if (ws < 0)  //置零当前线程所在的结点状态,允许失败。
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;  //找到下一个需要唤醒的结点s
    if (s == null || s.waitStatus > 0) {  //如果为空或已取消
        s = null;
		 // 从后向前找,找到等待队列最前面的一个有效节点
        for (Node t = tail; t != null && t != node; t = t.prev)  
            if (t.waitStatus <= 0)   //从这里可以看出,<=0 的结点,都是还有效的结点。
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//唤醒
}

这个函数并不复杂:用 unpark() 唤醒 head 节点 A;若 A节点无效,寻找 head 节点的下一节点 B ;若 B 为空或者是取消状态,则**从 tail 节点往前遍历找到处于等待队列最前面的一个正常阻塞状态的结点。**

这里我们也用 s 来表示被唤醒的节点吧。此时,再和 acquireQueued() 联系起来,s 被唤醒后,进入 if (p == head && tryAcquire(arg)) 的判断(即使 p != head 也没关系,它会再进入 shouldParkAfterFailedAcquire() 寻找一个安全点。这里既然 s 已经是等待队列中最前边的那个未放弃线程了,那么通过 shouldParkAfterFailedAcquire() 的调整,s 也必然会跑到 head 的 next 结点,下一次自旋 p==head 就成立啦),然后s把自己设置成 head标杆结点,表示自己已经获取到资源了,acquire() 也返回了!!

2.3 小结

release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

2.4 注意

如果获取锁的线程在release时异常了,没有unpark队列中的其他结点,这时队列中的其他结点会怎么办?是不是没法再被唤醒了?

答案是yes,这时,队列中等待锁的线程将永远处于park状态,无法再被唤醒!!!但是我们再回头想想,获取锁的线程在什么情形下会release抛出异常呢??

1,线程突然死掉了?可以通过thread.stop来停止线程的执行,但该函数的执行条件要严苛的多,而且函数注明是非线程安全的,已经标明Deprecated;

2,线程被interupt了?线程在运行态是不响应中断的,所以也不会抛出异常;

3,release代码有bug,抛出异常了?目前来看,Doug Lea的release方法还是比较健壮的,没有看出能引发异常的情形(如果有,恐怕早被用户吐槽了)。除非自己写的tryRelease()有bug,那就没啥说的,自己写的bug只能自己含着泪去承受了。

0

评论区