本文将剖析与ReentrantLock
密切相关的ConditionObject
的相关源码,并简要介绍ConditionObject
的实现原理
代码托管在https://github.com/zhongmingmao/concurrent_demo
关于ReentrantLock
的基本内容请参考「并发 - JUC - ReentrantLock - 源码剖析」,本文不再赘述
基础
使用场景
Condition
常用于生产者-消费者
的场景,例如ArrayBlockingQueue
,JUC
框架也有很多地方使用了Condition
,如下图所示
生产者-消费者
1 | /** |
Condition接口
1 | public interface Condition { |
一个很关键的地方:Condition
的实现类在重写Condition
的所有方法
都建议先持有与Condition关联的锁
,AQS
中的ConditionObject
就满足这一点,因此调用ConditionObject
的方法是线程安全
的,这里说的线程安全有一个前提,就是线程必须先持有独占锁
,下面的分析说线程安全的时候,不再重复说明这一前提
ConditionObject
实例化
在生产者-消费者
的代码里面,newCondition()
实际创建的是ConditionObject
对象
1 | // From ReentrantLock |
1 | // From Sync |
核心结构
1 | public class ConditionObject implements Condition, java.io.Serializable { |
在进一步分析之前,先行回顾一下AQS
中Node
的定义
1 | static final class Node { |
由此可见:
条件队列
仅有nextWaiter
,因此条件队列是单向非循环队列
,而同步队列是双向非循环队列
条件队列
中节点只有3种等待状态
:CANCELLED
:需要从条件队列中移除CONDITION
:等待被转移到同步队列0
:转移过程中或已经转移完成,在_transferAfterCancelledWait
或transferForSignal
_中设置,后面会详细分析
AQS
只能拥有1个同步队列
_,但可以拥有多个条件队列
_
同步队列与条件队列
条件队列
与同步队列
的关系大致如下:
简单说明(后面源码分析将详细介绍):
ReentrantLock.newCondition()
:创建一个新的ConditionObject
实例,每个ConditionObject
拥有firstWaiter
属性和lastWaiter
属性,对应一个条件队列
ConditionObject.await()
:将当前线程
包装成节点
后加入到对应的条件队列
并进行阻塞
,然后等待被转移
到同步队列
中ConditionObject.signal()
:将ConditionObject实例对应的条件队列中的节点
(从头结点开始往后遍历筛选)转移到AQS同步队列的队尾
,等待获得独占锁
,获得独占锁后,上面的ConditionObject.await()
方法返回,继续执行
源码分析
await
await()
方法的分析在本文中是最复杂的
!!
1 | // From ConditionObject |
addConditionWaiter
1 | // From ConditionObject |
unlinkCancelledWaiters
1 | // From ConditionObject |
逻辑示意图如下:
fullyRelease
1 | // From AQS |
1 | // From AQS |
1 | // From Sync |
isOnSyncQueue
1 | // From AQS |
findNodeFromTail
1 | // From AQS |
checkInterruptWhileWaiting
1 | // From ConditionObject |
transferAfterCancelledWait
1 | // From AQS |
reportInterruptAfterWait
1 | // From ConditionObject |
signal
有一点需要说明,ConditionObject.signal并不总是直接唤醒线程
,而是首先将节点从条件队列
转移到同步队列
,再依据在同步队列中前驱节点的等待状态
做不同的处理
- 如果
被转移的节点
在同步队列中的前驱节点没有被取消
,那么被转移的节点
在同步队列
中等待锁
- 如果
被转移的节点
在同步队列中的前驱节点被取消
了,才会直接唤醒被转移节点
的关联线程,这点比较重要,不要认为signal就是直接唤醒
1 | // From ConditionObject |
isHeldExclusively
1 | // From Sync |
doSignal
1 | // From ConditionObject |
transferForSignal
1 | // From AQS |
awaitUninterruptibly
前面分析的await()
方法是响应中断
的,本节介绍的waitUninterruptibly()
是不响应中断
的
1 | // From ConditionObject |
await(long time,TimeUnit unit)
前面分析的await()
方法是不限时等待
的,本节介绍的await(long time,TimeUnit unit)
是限时等待
的
1 | // From ConditionObject |