【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue

简介

上一篇【从入门到放弃-Java】并发编程-JUC-ConcurrentLinkedQueue学习了并发队列ConcurrentLinkedQueue,它是一个非阻塞无界队列。本文来学习下JUC中的一个阻塞有界队列-LinkedBlockingQueue。

LinkedBlockingQueue

如图继承了AbstractQueue类,实现了BlockingQueue和Serializable接口

LinkedBlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
// 如果没传capacity 则默认使用Integer.MAX_VALUE作为队列大小
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

/**
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
//设置大小为capacity,并设置item为null的head和last辅助节点
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}

/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}, initially containing the elements of the
* given collection,
* added in traversal order of the collection's iterator.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
//加入队锁
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
//依次将Collection中的元素加入队列
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
//设置队列大小n
count.set(n);
} finally {
//解锁
putLock.unlock();
}
}

可以从构造函数看出,LinkedBlockingQueue是一个有界的队列,队列最大值为capacity,如果初始化时不设置队列大小,则默认大小为Integer.MAX_VALUE

put

将元素加入队列,如果队列满,则一直等待,直到线程被中断或被唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final int c;
final Node<E> node = new Node<E>(e);

//入队锁,如果收到中断信号,则抛出异常
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
//如果队列满了,则通知线程进入await状态。
while (count.get() == capacity) {
notFull.await();
}
//将node加入队列
enqueue(node);
//队列元素数加一
c = count.getAndIncrement();
//如果队列没满,则唤醒await的线程进行入队操作
if (c + 1 < capacity)
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
//如果是第一次添加元素,则通知等待的读线程可以开始读数据了
if (c == 0)
signalNotEmpty();
}

offer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full.
* When using a capacity-restricted queue, this method is generally
* preferable to method {@link BlockingQueue#add add}, which can fail to
* insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
//将元素加入队列,如果队列满,则直接返回false
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//如果队列满了,则直接返回false。
if (count.get() == capacity)
return false;
final int c;
final Node<E> node = new Node<E>(e);
//加入队锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//再次判断, 队列是否满了,避免在第一次判断后和加锁前,队列被加满
if (count.get() == capacity)
return false;
//将node添加到队列中
enqueue(node);
c = count.getAndIncrement();
//如果队列没满,则唤醒await的线程进行入队操作
if (c + 1 < capacity)
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
//如果是第一次添加元素,则通知等待的读线程可以开始读数据了
if (c == 0)
signalNotEmpty();
return true;
}

/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary up to the specified wait time for space to become available.
*
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
//将元素加入队列,可以设置等待超时时间,如果队列满,则等待timeout毫秒,超时返回false
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
final int c;
//加锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//如果队列满了,则等待timeout毫秒,超时则返回false
while (count.get() == capacity) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
//入队
enqueue(new Node<E>(e));
c = count.getAndIncrement();
//如果队列没满,则唤醒await的线程进行入队操作
if (c + 1 < capacity)
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
//如果是第一次添加元素,则通知等待的读线程可以开始读数据了
if (c == 0)
signalNotEmpty();
return true;
}

take

从队列中取出元素,如果队列为空,则一直等待,直到线程被中断或被唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public E take() throws InterruptedException {
final E x;
final int c;
final AtomicInteger count = this.count;
//加出队锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//如果队列为空,则通知线程进入await状态。
while (count.get() == 0) {
notEmpty.await();
}
//从队列头部取出元素
x = dequeue();
//count减一
c = count.getAndDecrement();
//如果队列不为空,则唤醒出队等待线程
if (c > 1)
notEmpty.signal();
} finally {
//释放锁
takeLock.unlock();
}
//如果从满的队列中出列,则唤醒入队线程,队列已经不满了,可以添加元素了
if (c == capacity)
signalNotFull();
return x;
}

poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public E poll() {
final AtomicInteger count = this.count;
//如果队列为空,直接返回null
if (count.get() == 0)
return null;
final E x;
final int c;
//加出队锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//如果队列为空,直接返回null
if (count.get() == 0)
return null;
//出队,移除第一个数据节点
x = dequeue();
c = count.getAndDecrement();
//如果队列不为空,则唤醒出队等待线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//如果从满的队列中出列,则唤醒入队线程,队列已经不满了,可以添加元素了
if (c == capacity)
signalNotFull();
return x;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
final E x;
final int c;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//如果队列为空,则等待timeout时间, 超时返回null
while (count.get() == 0) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
//出队列
x = dequeue();
c = count.getAndDecrement();
//如果队列不为空,则唤醒出队等待线程
if (c > 1)
notEmpty.signal();
} finally {
//释放锁
takeLock.unlock();
}
//如果从满的队列中出列,则唤醒入队线程,队列已经不满了,可以添加元素了
if (c == capacity)
signalNotFull();
return x;
}

peek

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public E peek() {
final AtomicInteger count = this.count;
//如果队列为空,返回null
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//队列不为空返回第一个数据节点的元素,不移除节点,为空则返回null
return (count.get() > 0) ? head.next.item : null;
} finally {
takeLock.unlock();
}
}

remove

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element {@code e} such
* that {@code o.equals(e)}, if this queue contains one or more such
* elements.
* Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
//如果移除的元素为null,在返回false
if (o == null) return false;
//加入队和出队锁
fullyLock();
try {
//遍历队列,存在元素o则移除,返回true,否则返回false
for (Node<E> pred = head, p = pred.next;
p != null;
pred = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, pred);
return true;
}
}
return false;
} finally {
//释放入队锁和出队锁
fullyUnlock();
}
}

add、element

都在AbstractQueue中实现,上文【从入门到放弃-Java】并发编程-JUC-ConcurrentLinkedQueue中已分析,不再赘述。

总结

通过源码分析,我们了解到了入队和出队的机制。初始化时,需要设置队列大小, 在队列满时,入队操作会等待,队列为空时,出队操作会等待。即LinkedBlockingQueue是一个有界的阻塞队列。
和ConcurrentLinkedQueue对比,LinkedBlockingQueue采用锁分离,比较适合生产和消费频率差不多的场景,并且锁同步更适合单消费者的任务队列,而ConcurrentLinkedQueue使用CAS,并发性能较高更适合消费者多的消息队列。
在常用线程池中,Executors.newFixedThreadPool也是采用的LinkedBlockingQueue作为workQueue,在线程数超过corePoolSize后,会将任务加入到workQueue中等待处理。关于线程池的使用,后面会详细展开。