一,BlockingQueue
阻塞队列 (BlockingQueue) 是 Java util.concurrent 包下重要的数据结构,BlockingQueue 提供了线程安全的队列访问方式:
当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。
并发包下很多高级同步类的实现都是基于BlockingQueue实现的。 在最常见的使用到这个阻塞队列的地方,就是我们耳熟能详的线程池里面了,作为我们线程池的一大最大参与者,也是AQS的一个具体实现,因此可以好好的深入了解一下这个BlockingQueue阻塞队列。
主要常用的阻塞队列有如下:
- ArrayBlockingQueue: 由数组支持的有界队列;
- LinkedBlockingQueue: 由链接节点支持的可选有界队列;
- PriorityBlockingQueue: 由优先级堆支持的无界优先级队列;
- DelayQueue: 由优先级堆支持的、基于时间的调度队列;
二,ArrayBlockingQueue
原理:
阻塞队列(BlockingQueue)是在队列的基础上增加了两个附加操作, 在队列为空的时候,获取元素的线程会等待队列变为非空(take 方法)。 当队列满时,存储元素的线程会等待队列可用(put 方法)。
要实现这样的一个阻塞队列,需要用到两个关键的技术,队列元素的存储、以及 线程阻塞和唤醒。
而 ArrayBlockingQueue 是基于数组结构的阻塞队列,也就是队列元素是存储在 一个数组结构里面,并且由于数组有长度限制,为了达到循环生产和循环消费的目的,ArrayBlockingQueue 用到了循环数组。
而线程的阻塞和唤醒,用到了 J.U.C 包里面的 ReentrantLock 和 Condition。 Condition 相当于 wait/notify 在 JUC 包里面的实现。
由于阻塞队列的特性,可以非常容易实现生产者消费者模型,也就是生产者只需 要关心数据的生产,消费者只需要关注数据的消费,所以如果队列满了,生产者 就等待,同样,队列空了,消费者也需要等待。
常用方法:
- add(E e):把 e 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回 true,否则报异常;
- offer(E e):表示如果可能的话,将 e 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回 true,否则返回 false;
- put(E e):把 e 加到 BlockingQueue 里,如果 BlockQueue 没有空间,则调用此方法的线程被阻断直到 BlockingQueue 里面有空间再继续;
- poll(time):取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等 time 参数规定的时间,取不到时返回 null;
- take():取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 Blocking 有新的对象被加入为止;
- remainingCapacity():剩余可用的大小。等于初始容量减去当前的 size;
源码解析:
take方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
// 进入等待。队列put进元素时会进行唤醒
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
put 方法:
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
// 队列满了,则进入等待。元素出队列时会进行唤醒。
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
三,LinkedBlockingQueue
原理:
可以清晰的得出 ArrayBlockingQueue 是使用了独占锁的方式,要求两个操作进行时获得当先队列的独占锁,那么take() 和 put() 操作就不可能真正的并发。它们会彼此等待对方释放资源,在这种情况下所竞争会比较激烈,从而会影响到高并发的效率。可以使用 LinkedBlockingQueue 解决这一问题。
LinkedBlockingQueue 为了解决这一问题,采用锁分离的方式进行实现,take() 函数和 put() 函数分别实现了从队列中取得数据和往队列添加数据的功能。
换句话说就会说 take() 方法和 put()方法分别有专门的锁进行控制,由于 take() 方法是操作队尾,put() 方法操作队首,又因为 LinkedBlockingQueue 是基于链表的方式实现,因此两个操作不受影响。
源码解析:
LinkedBlockingQueue中的字段信息 :
/**take, poll的重入锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 不为空的条件 */
private final Condition notEmpty = takeLock.newCondition();
/** put, offer的重入锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 队满条件 */
private final Condition notFull = putLock.newCondition();
接下来看一下put方法是如何进行入队操作:
/**
* 将指定元素插入此队列的尾部, 等待队列空间可用。
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// 保持计数为负,表示失败,除非设定。
int c = -1;
Node<E> node = new Node<E>(e);
// putLock锁。
final ReentrantLock putLock = this.putLock;
// 链表长度,原子操作。
final AtomicInteger count = this.count;
// 获得锁,并且响应中断,put操作只有一个线程操作。
putLock.lockInterruptibly();
try {
// 如果链表长度等着capacity,代表队列已满,则等待队列为空。
while (count.get() == capacity) {
notFull.await();
}
// 将元素插入队列末尾。
enqueue(node);
// c为count加1前的值,这里是原子操作,它会进行CAS,因为现在是两个线程进行操作,有可能put的时候也进行take操作,所以要保证原子性。
c = count.getAndIncrement();
// 当c+1不是最大值时,通知notFull,队列未满可继续添加元素,通知其他线程。
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// c代表插入前的的值,所以队列为空的时候c=0,此时已经插入了数据所以c本来应该不为0,所以需要通知队列元素插入成功。
if (c == 0)
signalNotEmpty();
}
通过源码可以清晰得到put方法是如何进行操作的,首先获取putLock 锁,获取队列的原子类型的长度,如果当前队列的长度与队列最大长度相等说明队列未满,则等待队列为空的时候插入数据,当队列未满时,可直接插入数据到队尾,由于 AtomicInteger 的方法 getAndIncrement 返回的是操作之前的值,比如 5 原子性增加 1,它会返回5 而不是返回增加后的 6,c 存放的是 count 元素加 1 之前的值,也就是该队列为空的时候 c 的长度是为 0,当执行完了 put 方法后,实际的 count 为 1,但是这里因为存放的是加1前的值,所有 c=0,代表队列中有数据通知notEmpty 可以进行 take 了。
接下来说一下take方法的源码是如何实现的:
/**
* 从队头获取元素,等待队列有数据可读。
*/
public E take() throws InterruptedException {
E x;
// 本地保存变量。
int c = -1;
// 队列长度。
final AtomicInteger count = this.count;
// 获取take重入锁。
final ReentrantLock takeLock = this.takeLock;
// 获得锁,并且响应中断操作,并且只有一个线程进入take方法。
takeLock.lockInterruptibly();
try {
// 如果队列为空则等待队列不为空时进行获取操作。
while (count.get() == 0) {
notEmpty.await();
}
// 出队列操作。
x = dequeue();
// c保存减1前的值。
c = count.getAndDecrement();
// 如果队列还有元素则可通知其他线程进行take操作。
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// c如果是capacity的时候代表之前队列存在过满的情况,进行take方法后则表示队列有空间可用可进行put操作,通知notFull进行put操作。
if (c == capacity)
signalNotFull();
return x;
}
通过上面的源码可以看到,take 方法获取的是 takeLock 重入锁,并且当前线程进入到 take 方法后,其他线程是不允许同时进入到 take 方法中,首先判断队列的长度是不是为 0,如果队列为 0 则代表队列中无数据可消费,则进行等待,等待队列中有元素时进行 take 后的操作,如果队列长度不为 0,则进行 dequeue 方法,出队列操作,将 head 节点指向下一个节点,将当前head值返回,当 c 大于 1 时,代表还有元素可以 take,通知其他线程进行take 操作,c 如果是 capacity 的时候,代表之前队列存在过满的情况,进行这次 take 方法后队列有空间可用,所以可以通知 notFull 进行 put 操作。
总结:
- LinkedBlockingQueue是通过锁分离的方式进行控制,减少了 take 和 put 之间的锁竞争;
- LinkedBlockingQueue 是通过链表的方式实现,所以进行锁分离时不会冲突,因为入队和出队分别作用于队尾和队首;
- 内部采用了原子操作类(CAS)进行控制链表长度;
- 入队后,如果之前队列为空时,会唤醒 take 方法,队列已有数据可进行 take,反之,出队后,队列之前已满,则唤醒 put 方法,队列已有空闲位置可进行 put 操作;
评论区