AQS原理解析(三、condition原理)

pexels-markus-winkler-3828944

condition的作用

condition的使用场景其实很多,涉及到条件判断的并发场景都可以用到,比如:

  • 阻塞队列的ArrayBlockingQueue中做队列满和空的条件判断
  • CyclicBarrier中做阻塞与唤醒所有线程的判断
  • DelayQueue中的阻塞获取队列数据的判断
  • 线程池ThreadPoolExecutor中awaitTermination方法的条件判断

condition怎么用呢?

在使用synchronized时我们可以使用wait()、notify()、notifyAll()方法来调度线程,而condition提供了类似的方法:wait(),signal(),signalAll的功能,并且能够更加精细的控制等待的范围,像上面所说,jdk中使用了很多ReentrantLock和condition的配合来实现线程调度

我们看一个conditon最常见的使用方式:生产消费者的模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
public class ConditionTest {

LinkedList<String> lists = new LinkedList<>();

Lock lock = new ReentrantLock();

//集合是否满的条件判断
Condition fullCondition = lock.newCondition();

//集合是否空的条件判断
Condition emptyCondition = lock.newCondition();

//生产者
private void product(){
lock.lock();
try {
//假如集合大小为10
while (lists.size() == 10){
System.out.println("list is full");
fullCondition.await();
}
//生产一个5位的随机字符串
String randomString = getRandomString(5);
lists.add(randomString);
System.out.println(String.format("product %s size %d %s",randomString,lists.size(),Thread.currentThread().getName()));
//通知消费者可以消费了
emptyCondition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

//消费者
private String consume(){
lock.lock();
try{
while (lists.size() == 0){
System.out.println("list is empty");
emptyCondition.await();
}
String first = lists.removeFirst();
//通知生产者可以生产了
fullCondition.signalAll();
return first;
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
return null;
}

/**
* 生成随机字符串
* @param length
* @return
*/
public static String getRandomString(int length){
String str="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
Random random=new Random();
StringBuffer sb=new StringBuffer();
for(int i=0;i<length;i++){
int number=random.nextInt(62);
sb.append(str.charAt(number));
}
return sb.toString();
}

public static void main(String[] args) {

ConditionTest test = new ConditionTest();

ExecutorService executorService = Executors.newCachedThreadPool();

//线程个数控制消费的快还是生产的快
for(int i = 0;i<2;i++){

executorService.submit(()->{
System.out.println(Thread.currentThread().getName());
while (true){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
test.product();
}
});
}

for(int k = 0;k<1;k++){
executorService.submit(()->{
System.out.println("cousumestart");
while (true) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
String consume = test.consume();
System.out.println("consume " + consume+ " "+Thread.currentThread().getName() );
}
});
}

//等待输入,阻塞主线程不退出
try {
new BufferedReader(new InputStreamReader(System.in)).readLine();
} catch (IOException e) {
e.printStackTrace();
}


}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//部分输出日志
product qeV0r size 7 pool-1-thread-1
product xEUkA size 8 pool-1-thread-2
consume P5Je1 pool-1-thread-3
product rQS1D size 8 pool-1-thread-1
product QcEtf size 9 pool-1-thread-2
consume 2q7Fc pool-1-thread-3
product Z5rBg size 9 pool-1-thread-1
consume UBxBD pool-1-thread-3
product Tr5q2 size 9 pool-1-thread-2
product HXBdE size 10 pool-1-thread-1
list is full
consume aYDNR pool-1-thread-3
product ukjnk size 10 pool-1-thread-2
list is full
consume LBEdA pool-1-thread-3
product iK28H size 10 pool-1-thread-2
list is full
list is full

可以看到生产者线程有2个,消费者线程有1个,生产和消费的速度相同,用Thread.sleep控制,
生产速度大于消费速度,最后集合元素到10个的时候生产者调用fullCondition.await();阻塞,只有消费者消费后通过fullCondition.signalAll();通知生产者继续生产

同理添加消费者线程数,使消费的速度快与生产,则集合为空时会调用emptyCondition.await();阻塞,生产者生产后回调用emptyCondition.signalAll();通知消费者继续生产

相较于对象的wait()、notifyAll()方法不同的条件分开判断,颗粒度更小一些,唤醒的线程范围更精准

再看一下ArrayBlockingQueue的一个例子,在一段时间内阻塞获取队列数据,取不到则返回空:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
//notEmpty 是lock new出来的一个condition
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}

condition的使用场景还多,下面我们就一起看看condition的实现原理吧,首先condition需要在AbstractQueuedSynchronizer实现类的

condition原理解析

我们知道AQS中维护了一个队列来控制线程的执行,condition中使用了另一个等待队列来实现条件的判断,condition必须在aqs的acquire获取锁后使用,调用condition.await()方法将添加一个node到条件队列中,在调用signal()或signalAll()后将此节点移出condition的等待队列放到锁的等待队列中去竞争锁,取到锁后继续执行后续逻辑。


condition有以下几个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//将等待时间最长的线程从condition等待队列放到锁的等待队列中
public final void signal()
//将所有等待线程从condition等待队列放到锁的等待队列中
public final void signalAll()
//condition的等待方法
public final void await() throws InterruptedException
//不可中断的wait
public final void awaitUninterruptibly()
//几个有时间参数的wait方法
public final long awaitNanos(long nanosTimeout)
throws InterruptedException
public final boolean awaitUntil(Date deadline)
throws InterruptedException
public final boolean await(long time, TimeUnit unit)
throws InterruptedException

先看一下最主要的await方法

AbstractQueuedSynchronizer.ConditionObject#await()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

public final void await() throws InterruptedException {
//如果当前线程被中断了抛出InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();//(1)
int savedState = fullyRelease(node);//(2)
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//(3)
//挂起线程
LockSupport.park(this);
//中断情况的判断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//被唤醒后去抢锁,抢到后继续执行
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//如果阻塞中发生了中断,则抛出异常
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
(1)addConditionWaiter

在condition等待队列尾部加入一个节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果最后一个节点不是condition状态(被取消状态)被取消状态是在fullyReleas方法中产生的
if (t != null && t.waitStatus != Node.CONDITION) {
//从头节点开始将被取消或者超时的节点移出队列
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//队列为空的情况
if (t == null)
firstWaiter = node;
else
//插入尾节点
t.nextWaiter = node;
lastWaiter = node;
return node;
}
(2)fullyRelease

能调用wait方法说明已经获取到锁了,fullyRelease方法就是提前调用解锁方法,将自己从lock的队列中移出,并返回当前节点的状态savedState,这里如果释放失败说明当前线程不在持有锁,状态错误,将节点设置成CANCELLED状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

release方法调用tryRelease释放锁并唤醒首节点,在ReentrantLock的实现中tryRelease会判断当前线程是否获取锁,所以在lock方法范围内使用condition会报IllegalMonitorStateException异常

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
(3)isOnSyncQueue

回到await方法,循环调用isOnSyncQueue判断是否在锁的等待队列中(注意不是condition的等待队列),不在锁的等待队列中则调用LockSupport.park(this)挂起线程。

1
2
3
4
5
6
7
8
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;

return findNodeFromTail(node);
}

awaitNanos方法

大致逻辑和await相同,就是多了一个时间的判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//如果时间小于0,直接从condition队列
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
//如果大于自旋的阈值则使用parkNanos设置线程挂起的时间,否则继续自旋
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}

signal()方法

signal的作用是将condition队列中等待时间最长的node转移到锁队列末尾,去重新抢锁

1
2
3
4
5
6
7
8
9

public final void signal() {
//有不同的实现,ReentrantLock中是判断持有锁的是否当前线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

doSignal

将condition中等待时间最长的节点调用transferForSignal方法放到锁队列中,循环调用是要寻找第一个不是cancelled状态的节点

1
2
3
4
5
6
7
8
9
       
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
doSignalAll

doSignalAll是将所有等待队列中的节点放到锁队列末尾

1
2
3
4
5
6
7
8
9
10
       
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

transferForSignal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final boolean transferForSignal(Node node) {

//cas设置节点为0状态,如果失败说明节点已经被取消了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//添加到锁队列中
Node p = enq(node);
int ws = p.waitStatus;
//cancelled状态或者设置SIGNAL状态失败则唤醒此线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

condition中有很多线程与中断的细节处理,有兴趣的可以自己去看看源码

总结一下:

  • condition必须使用在lock中
  • condition提供了类似object.wait和notify的通信机制,但支持多个条件队列,使用上更灵活
  • condition的原理流程如下
    • 线程1获取锁
    • 线程1调用condition.await()进入condition等待队列并阻塞,释放锁给别的线程
    • 线程2获取锁,调用condition.signal,将condition等待队列中的线程1所在的node放在锁的等待队列中竞争锁