侧边栏壁纸
博主头像
再见理想博主等级

只争朝夕,不负韶华

  • 累计撰写 112 篇文章
  • 累计创建 64 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

BlockingQueue 阻塞队列解析

再见理想
2022-10-11 / 0 评论 / 0 点赞 / 544 阅读 / 2,494 字

一,BlockingQueue

阻塞队列 (BlockingQueue) 是 Java util.concurrent 包下重要的数据结构,BlockingQueue 提供了线程安全的队列访问方式:

当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。


并发包下很多高级同步类的实现都是基于BlockingQueue实现的。 在最常见的使用到这个阻塞队列的地方,就是我们耳熟能详的线程池里面了,作为我们线程池的一大最大参与者,也是AQS的一个具体实现,因此可以好好的深入了解一下这个BlockingQueue阻塞队列。

主要常用的阻塞队列有如下:

  • ArrayBlockingQueue: 由数组支持的有界队列;
  • LinkedBlockingQueue: 由链接节点支持的可选有界队列;
  • PriorityBlockingQueue: 由优先级堆支持的无界优先级队列;
  • DelayQueue: 由优先级堆支持的、基于时间的调度队列;

二,ArrayBlockingQueue

原理:

阻塞队列(BlockingQueue)是在队列的基础上增加了两个附加操作, 在队列为空的时候,获取元素的线程会等待队列变为非空(take 方法)。 当队列满时,存储元素的线程会等待队列可用(put 方法)。

要实现这样的一个阻塞队列,需要用到两个关键的技术,队列元素的存储、以及 线程阻塞和唤醒

而 ArrayBlockingQueue 是基于数组结构的阻塞队列,也就是队列元素是存储在 一个数组结构里面,并且由于数组有长度限制,为了达到循环生产和循环消费的目的,ArrayBlockingQueue 用到了循环数组

而线程的阻塞和唤醒,用到了 J.U.C 包里面的 ReentrantLock 和 Condition。 Condition 相当于 wait/notify 在 JUC 包里面的实现。

由于阻塞队列的特性,可以非常容易实现生产者消费者模型,也就是生产者只需 要关心数据的生产,消费者只需要关注数据的消费,所以如果队列满了,生产者 就等待,同样,队列空了,消费者也需要等待。


常用方法:

  • add(E e):把 e 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回 true,否则报异常;
  • offer(E e):表示如果可能的话,将 e 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回 true,否则返回 false;
  • put(E e):把 e 加到 BlockingQueue 里,如果 BlockQueue 没有空间,则调用此方法的线程被阻断直到 BlockingQueue 里面有空间再继续;
  • poll(time):取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等 time 参数规定的时间,取不到时返回 null;
  • take():取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 Blocking 有新的对象被加入为止;
  • remainingCapacity():剩余可用的大小。等于初始容量减去当前的 size;

源码解析:

take方法:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            // 进入等待。队列put进元素时会进行唤醒
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

put 方法:

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 队列满了,则进入等待。元素出队列时会进行唤醒。
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

三,LinkedBlockingQueue

原理:

可以清晰的得出 ArrayBlockingQueue 是使用了独占锁的方式,要求两个操作进行时获得当先队列的独占锁,那么take() 和 put() 操作就不可能真正的并发。它们会彼此等待对方释放资源,在这种情况下所竞争会比较激烈,从而会影响到高并发的效率。可以使用 LinkedBlockingQueue 解决这一问题。

LinkedBlockingQueue 为了解决这一问题,采用锁分离的方式进行实现,take() 函数和 put() 函数分别实现了从队列中取得数据和往队列添加数据的功能。
换句话说就会说 take() 方法和 put()方法分别有专门的锁进行控制,由于 take() 方法是操作队尾,put() 方法操作队首,又因为 LinkedBlockingQueue 是基于链表的方式实现,因此两个操作不受影响。


源码解析:

LinkedBlockingQueue中的字段信息 :

/**take, poll的重入锁 */
private final ReentrantLock takeLock = new ReentrantLock();

/** 不为空的条件 */
private final Condition notEmpty = takeLock.newCondition();

/** put, offer的重入锁 */
private final ReentrantLock putLock = new ReentrantLock();

/** 队满条件 */
private final Condition notFull = putLock.newCondition();

接下来看一下put方法是如何进行入队操作:

/**
 * 将指定元素插入此队列的尾部, 等待队列空间可用。
 */
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // 保持计数为负,表示失败,除非设定。
  	int c = -1;
    Node<E> node = new Node<E>(e);
  	// putLock锁。
    final ReentrantLock putLock = this.putLock;
  	// 链表长度,原子操作。
    final AtomicInteger count = this.count;
  	// 获得锁,并且响应中断,put操作只有一个线程操作。
    putLock.lockInterruptibly();
    try {
        // 如果链表长度等着capacity,代表队列已满,则等待队列为空。
        while (count.get() == capacity) {
            notFull.await();
        }
      	// 将元素插入队列末尾。
        enqueue(node);
      	// c为count加1前的值,这里是原子操作,它会进行CAS,因为现在是两个线程进行操作,有可能put的时候也进行take操作,所以要保证原子性。
        c = count.getAndIncrement();
      	// 当c+1不是最大值时,通知notFull,队列未满可继续添加元素,通知其他线程。
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
  	// c代表插入前的的值,所以队列为空的时候c=0,此时已经插入了数据所以c本来应该不为0,所以需要通知队列元素插入成功。
    if (c == 0)
        signalNotEmpty();
}

通过源码可以清晰得到put方法是如何进行操作的,首先获取putLock 锁,获取队列的原子类型的长度,如果当前队列的长度与队列最大长度相等说明队列未满,则等待队列为空的时候插入数据,当队列未满时,可直接插入数据到队尾,由于 AtomicInteger 的方法 getAndIncrement 返回的是操作之前的值,比如 5 原子性增加 1,它会返回5 而不是返回增加后的 6,c 存放的是 count 元素加 1 之前的值,也就是该队列为空的时候 c 的长度是为 0,当执行完了 put 方法后,实际的 count 为 1,但是这里因为存放的是加1前的值,所有 c=0,代表队列中有数据通知notEmpty 可以进行 take 了。


接下来说一下take方法的源码是如何实现的:

/**
 * 从队头获取元素,等待队列有数据可读。
 */
public E take() throws InterruptedException {
    E x;
  	// 本地保存变量。
    int c = -1;
  	// 队列长度。
    final AtomicInteger count = this.count;
  	// 获取take重入锁。
    final ReentrantLock takeLock = this.takeLock;
  	// 获得锁,并且响应中断操作,并且只有一个线程进入take方法。
    takeLock.lockInterruptibly();
    try {
      	// 如果队列为空则等待队列不为空时进行获取操作。
        while (count.get() == 0) {
            notEmpty.await();
        }
      	// 出队列操作。
        x = dequeue();
      	// c保存减1前的值。
        c = count.getAndDecrement();
      	// 如果队列还有元素则可通知其他线程进行take操作。
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
  	// c如果是capacity的时候代表之前队列存在过满的情况,进行take方法后则表示队列有空间可用可进行put操作,通知notFull进行put操作。
    if (c == capacity)
        signalNotFull();
    return x;
}

通过上面的源码可以看到,take 方法获取的是 takeLock 重入锁,并且当前线程进入到 take 方法后,其他线程是不允许同时进入到 take 方法中,首先判断队列的长度是不是为 0,如果队列为 0 则代表队列中无数据可消费,则进行等待,等待队列中有元素时进行 take 后的操作,如果队列长度不为 0,则进行 dequeue 方法,出队列操作,将 head 节点指向下一个节点,将当前head值返回,当 c 大于 1 时,代表还有元素可以 take,通知其他线程进行take 操作,c 如果是 capacity 的时候,代表之前队列存在过满的情况,进行这次 take 方法后队列有空间可用,所以可以通知 notFull 进行 put 操作。


总结:

  • LinkedBlockingQueue是通过锁分离的方式进行控制,减少了 take 和 put 之间的锁竞争;
  • LinkedBlockingQueue 是通过链表的方式实现,所以进行锁分离时不会冲突,因为入队和出队分别作用于队尾和队首;
  • 内部采用了原子操作类(CAS)进行控制链表长度;
  • 入队后,如果之前队列为空时,会唤醒 take 方法,队列已有数据可进行 take,反之,出队后,队列之前已满,则唤醒 put 方法,队列已有空闲位置可进行 put 操作;
0

评论区