JDK源码分析-PriorityBlockingQueue

语言: CN / TW / HK

概述

前文「 JDK源码分析-PriorityQueue 」分析了优先队列 PriorityQueue,它既不是阻塞队列,而且线程不安全。本文分析线程安全的阻塞优先队列 PriorityBlockingQueue。它的继承结构如下:

PriorityBlockingQueue 与 PriorityQueue 的内部结构类似,也是物理上由数组、逻辑上由堆结构实现的,并且使用 ReentrantLock 实现线程安全。除此之外, 二者大部分操作都是类似的。

因此,有了前文的铺垫,这里相对更容易理解一些。下面分析其代码实现。

代码分析

主要成员变量


 

// 内部数组的默认初始化容量

private static final int DEFAULT_INITIAL_CAPACITY = 11;


// 内部数组的最大容量

private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;


// 保存元素的内部数组

private transient Object[] queue;


// 队列中元素的数量

private transient int size;


// 队列中元素的比较器

private transient Comparator<? super E> comparator;


// 互斥锁(保证线程安全)

private final ReentrantLock lock;


// 表示队列非空的条件

private final Condition notEmpty;


// 扩容时使用的自旋锁,通过 CAS 获取(后面分析)

private transient volatile int allocationSpinLock;


// 一个普通的优先队列,主要用于序列化和反序列化

private PriorityQueue<E> q;

构造器


 

// 构造器 1:使用默认的初始化容量创建一个对象

public PriorityBlockingQueue() {

this(DEFAULT_INITIAL_CAPACITY, null);

}


// 构造器 2:使用给定的容量创建一个对象

public PriorityBlockingQueue(int initialCapacity) {

this(initialCapacity, null);

}


// 构造器 3:使用给定的容量和比较器创建一个对象

public PriorityBlockingQueue(int initialCapacity,

Comparator<? super E> comparator) {

if (initialCapacity < 1)

throw new IllegalArgumentException();

this.lock = new ReentrantLock();

this.notEmpty = lock.newCondition();

this.comparator = comparator;

this.queue = new Object[initialCapacity];

}

上面几个构造器都是比较简单的赋值。除此之外,还有一个用给定集合初始化的构造器,如下:


 

public PriorityBlockingQueue(Collection<? extends E> c) {

this.lock = new ReentrantLock();

this.notEmpty = lock.newCondition();

// 是否需要堆化

boolean heapify = true; // true if not known to be in heap order

// 是否需要筛选空值

boolean screen = true; // true if must screen for nulls

// 给定集合为 SortedSet

if (c instanceof SortedSet<?>) {

SortedSet<? extends E> ss = (SortedSet<? extends E>) c;

this.comparator = (Comparator<? super E>) ss.comparator();

heapify = false; // 已经有序,不需要再堆化

}

// 给定集合为 PriorityBlockingQueue

else if (c instanceof PriorityBlockingQueue<?>) {

PriorityBlockingQueue<? extends E> pq =

(PriorityBlockingQueue<? extends E>) c;

this.comparator = (Comparator<? super E>) pq.comparator();

screen = false; // 不需要筛选判空

if (pq.getClass() == PriorityBlockingQueue.class) // exact match

heapify = false; // 不需要堆化

}

// 集合转为数组

Object[] a = c.toArray();

int n = a.length;

// If c.toArray incorrectly doesn't return Object[], copy it.

if (a.getClass() != Object[].class)

a = Arrays.copyOf(a, n, Object[].class);

// 集合内所有元素都不能为空

if (screen && (n == 1 || this.comparator != null)) {

for (int i = 0; i < n; ++i)

if (a[i] == null)

throw new NullPointerException();

}

this.queue = a;

this.size = n;

if (heapify)

heapify(); // 堆化

}

堆化操作 heapify 代码如下:


 

private void heapify() {

Object[] array = queue;

int n = size;

int half = (n >>> 1) - 1;

Comparator<? super E> cmp = comparator;

// 根据比较器(Comparator)是否为空,采用不同的策略

// PS: 二者操作基本一样,只是 Comparator 和 Comparable 的区别

if (cmp == null) {

for (int i = half; i >= 0; i--)

siftDownComparable(i, (E) array[i], array, n);

}

else {

for (int i = half; i >= 0; i--)

siftDownUsingComparator(i, (E) array[i], array, n, cmp);

}

}

siftDownUsingComparator 代码如下:


 

private static <T> void siftDownUsingComparator(int k, T x, Object[] array,

int n,

Comparator<? super T> cmp) {

if (n > 0) {

// 数组的中间位置

int half = n >>> 1;

while (k < half) {

// 获取索引为 k 的节点左子节点索引

int child = (k << 1) + 1;

// 获取 child 的值

Object c = array[child];

// 获取索引为 k 的节点右子节点索引

int right = child + 1;

// 比较左右子节点的值,取较小的一个

if (right < n && cmp.compare((T) c, (T) array[right]) > 0)

c = array[child = right];

// 给定的元素 x 与其较小的子节点的值比较,若 x 不大于子节点的值,停止交换

if (cmp.compare(x, (T) c) <= 0)

break;

// 将 x 与其较小的子节点互换位置

array[k] = c;

k = child;

}

array[k] = x;

}

}

该方法与 PriorityQueue 中的 siftDownUsingComparator 方法操作几乎完全一致,可参考前文的分析,这里不再赘述( siftDownComparable 方法亦是如此)。

入队方法 :add(E), put(E), offer(E, timeout, TimeUnit), offer(E)


 

public boolean add(E e) {

return offer(e);

}


public void put(E e) {

offer(e); // never need to block

}


public boolean offer(E e, long timeout, TimeUnit unit) {

return offer(e); // never need to block

}

上述三个方法内部都是通过 offer(e) 方法实现的,因此只需分析 offer(e) 方法即可:


 

public boolean offer(E e) {

// 插入元素不能为空

if (e == null)

throw new NullPointerException();

// 获取锁,保证线程安全

final ReentrantLock lock = this.lock;

lock.lock();

int n, cap;

Object[] array;

// 如果容量不够,则进行扩容(注意这里是一个循环)

while ((n = size) >= (cap = (array = queue).length))

tryGrow(array, cap); // 尝试扩容

try {

Comparator<? super E> cmp = comparator;

// 根据 Comparator 是否为空采用不同的堆化策略

if (cmp == null)

siftUpComparable(n, e, array);

else

siftUpUsingComparator(n, e, array, cmp);

size = n + 1;

// 有新元素插入了,唤醒 notEmpty 条件下等待的线程(消费者)

notEmpty.signal();

} finally {

// 释放锁

lock.unlock();

}

return true;

}

下面分析一下扩容操作 tryGrow:


 

private void tryGrow(Object[] array, int oldCap) {

// 释放锁

lock.unlock(); // must release and then re-acquire main lock

Object[] newArray = null;

// 尝试以 CAS 方式修改 allocationSpinLock 的值(将 0 改为 1)

if (allocationSpinLock == 0 &&

UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,

0, 1)) {

try {

// 若旧容量 n 较小(小于 64),则扩容为 2 * n + 2,否则扩容为 1.5 * n

int newCap = oldCap + ((oldCap < 64) ?

(oldCap + 2) : // grow faster if small

(oldCap >> 1));

if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow

int minCap = oldCap + 1;

if (minCap < 0 || minCap > MAX_ARRAY_SIZE)

throw new OutOfMemoryError();

newCap = MAX_ARRAY_SIZE;

}

// 创建一个新数组

if (newCap > oldCap && queue == array)

newArray = new Object[newCap];

} finally {

// 将 allocationSpinLock 重置为 0

allocationSpinLock = 0;

}

}

// newArray 为空表示未进行上述扩容操作,则当前线程让出 CPU 时间

if (newArray == null) // back off if another thread is allocating

Thread.yield();

// 尝试获取锁

lock.lock();

// 到这里表示扩容成功

// queue == array 保证老数据复制一次

if (newArray != null && queue == array) {

// 扩容后的新数组

queue = newArray;

System.arraycopy(array, 0, newArray, 0, oldCap);

}

}

这个扩容方法比较有意思:它刚开始会释放锁,而后再重新获取锁。

1. 为什么刚开始要释放锁?

由于该锁是全局的, 其他大部分公有(public)方法也会用到;而扩容操作又相对比较耗时,若这里不释放,则某个线程扩容时其他方法调用可能会阻塞。

2. 释放锁之后如何保证线程安全?

这就用到了成员变量 allocationSpinLock,使用了 Unsafe 类的 CAS 操作。它尝试将  allocationSpinLock 的值设置为 1,而一旦操作成功,其他线程就无法进入,直到该线程将它重置为 0. 这就保证了同一时间内只能有一个线程在扩容。

3. 在释放锁后的扩容操作中,先后可能会有多个线程扩容,也即会产生 多个新容量的空数组此时 它们都未指向 原先的数组 queue),如何避免老数据多次复制到新数组呢?

代码里用到了 queue == array 这个判断。

比如线程 T1 和 T2 都对原数组进行了扩容,得到了两个 newArray,在后面复制老数据时,若其中一个线程已经对 queue 重新赋值并复制后,由于 queue 已经改变, 后面的线程就不会再复制一次了。

出队方法 :poll(), take(), peek()


 

// 出队

public E poll() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

return dequeue();

} finally {

lock.unlock();

}

}

// 出队(队列为空时阻塞)

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

E result;

try {

while ( (result = dequeue()) == null)

notEmpty.await();

} finally {

lock.unlock();

}

return result;

}


// 有超时等待的出队

public E poll(long timeout, TimeUnit unit) throws InterruptedException {

long nanos = unit.toNanos(timeout);

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

E result;

try {

while ( (result = dequeue()) == null && nanos > 0)

nanos = notEmpty.awaitNanos(nanos);

} finally {

lock.unlock();

}

return result;

}

可以看到这几个出队的操作都加了锁,内部都调用了 dequeue 方法:


 

private E dequeue() {

int n = size - 1;

if (n < 0)

return null;

else {

Object[] array = queue;

// 取数据中的第一个元素

E result = (E) array[0];

// 获取最后一个元素

E x = (E) array[n];

// 将最后一个元素置空,并恢复堆结构

array[n] = null;

Comparator<? super E> cmp = comparator;

if (cmp == null)

siftDownComparable(0, x, array, n);

else

siftDownUsingComparator(0, x, array, n, cmp);

size = n;

return result;

}

}

该方法与 PriorityQueue 的出队操作  poll() 类似,也不再赘述。

小结

1. PriorityBlockingQueue  是优先队列的阻塞 方式 实现,它 与 PriorityQueue 内部结构类似,即物理结构是可变数组、逻辑结构是堆;

2.  PriorityBlockingQueue 内部元素不能为空,且可比较,使用 ReentrantLock 保证 线程安全。

参考链接:

https://juejin.im/post/5cc258796fb9a03228616e6e

https://blog.csdn.net/codejas/article/details/89190774

相关阅读:

JDK源码分析-PriorityQueue

JDK源码分析-ReentrantLock

Stay hungry, stay foolish.

分享到: