任意一个java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者再使用方式以及功能特性上还是有差别的。
Object的监视器方法与Condition接口的对比:
对比项 | Object Monitor Methods | Condition |
---|---|---|
前置条件 | 获取对象的锁 | 调用Lock.lock()获取锁 调用Lock.newConditon()获取Condition对象 |
调用方式 | 直接调用如:object.wait() | 直接调用如:conditon.await() |
等待队列个数 | 一个 | 多个 |
当前线程释放锁并进入等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态,再等待状态中不响应中断 | 不支持 | 支持 |
当前线程释放锁并进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态到将来的某个时间 | 不支持 | 支持 |
唤醒等待队列中的一个线程 | 支持 | 支持 |
唤醒等待队列中的全部线程 | 支持 | 支持 |
Condition接口与示例
Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说,Condition是依赖Lock对象的。
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
lock.lock();
try {
condition.await();
}finally {
lock.unlock();
}
}
public void conditionSignal(){
lock.lock();
try {
condition.signal();
}finally {
lock.unlock();
}
}
如代码所示,一般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。
Condition部分方法描述:
方法名称 | 描述 |
---|---|
void await() throws InterruptedException | 当前线程进入等待状态直到被通知(signal)或中断,当前线程将进入运行状态且从await()方法返回的情况,包括: 其他线程调用该Condition的signal()或signalAll()方法,而当前线程被选中唤醒 其他线程(调用interrupt()方法)中断当前线程 如果当前等待线程从await()方法返回,那么表明该线程已经获取了Condition对象所对应的锁。 |
void awaitUninterruptibly() | 当前线程进入等待状态直到被通知,从方法名称上可以看出该方法对中断不敏感 |
long awaitNanos(long nanos-Timeout) throws Interrupted-Exception | 当前线程进入等待状态直到被通知、中断或者超时。返回值表示剩余的时间,如果返回值是0或者负数,那么可以认定已经超时了 |
boolean awaitUntil(Datedeadline) throws InterruptedException | 当前线程进入等待状态直到被通知、中断或者到某个时间。如果没有到指定时间就被通知,方法返回true,否则,表示到了指定时间,方法返回false |
void signal() | 唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获得与Condition相关联的锁 |
void signalAll() | 唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须获得与Condition相关联的锁 |
获取一个Condion必须通过Lock的newCondition()方法。下面通过一个有界队列的示例来深入了解Condition的使用方式。
有界队列是一种特殊的队列,当队列为空时,队列的获取操作将会阻塞获取线程,直到队列中有新增元素,当队列已满时,队列的插入操作将会阻塞插入线程,直到队列出现“空位”
public class BoundedQueue<T> {
private Object[] items;
private int addIndex,removeIndex,count;
private Lock lock = new ReentrantLock();
private Condition addThread = lock.newCondition();
private Condition removeThread = lock.newCondition();
public BoundedQueue(int size){
items = new Object[size];
}
/**
* 添加一个元素,如果数组满,则添加线程进入等待状态,直到有“空位”
* @param t
* @throws InterruptedException
*/
public void add(T t) throws InterruptedException {
lock.lock();
try {
while(count == items.length){
//当前添加线程释放锁,进入等待状态
addThread.await();
}
items[addIndex] = t;
if(++addIndex == items.length){
addIndex = 0;
}
++count;
//添加完元素,唤醒移除等待的线程
removeThread.signal();
}finally {
lock.unlock();
}
}
/**
* 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
* @return
* @throws InterruptedException
*/
public T remove() throws InterruptedException {
lock.lock();
try {
while(count == 0){
removeThread.await();
}
Object x = items[removeIndex];
if(++removeIndex == items.length){
removeIndex = 0;
}
--count;
addThread.signal();
return (T)x;
}finally {
lock.unlock();
}
}
}
上述代码,BoundedQueue通过add(T t)方法添加一个元素,通过remove()方法移出一个元素,以添加方法为例:
首先需要获得锁,目的是确保数组修改的可见性和排他性。当数组数量等于数组长度时,表示数组已满,则调用addThread.await(),当前线程随之释放锁并进入等待状态。如果数组数量不等于数组长度,表示数组未满,则添加到数组中,同时通知等待在removeThread上的线程,数组中已经有新元素可以获取。
Condition的实现分析
ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。
Condition的实现,主要包括:
- 等待队列
- 等待
- 通知
等待队列
等待队列是一个FIFO的队列,在队列中每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。
同步队列和等待队列中节点类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node。
一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。等待队列的基本结构如下所示:
Condition拥有首尾节点的引用,而新增节点只须要将原有的尾节点nextWaiter指向它,并且更新尾节点即可。
上述节点引用更新的过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列,对应关系如下:
Condition的实现是同步器的内部类,因此每个Condition实例都能够访问同步器提供的方法,相当于每个Condition都拥有所属同步器的引用。
等待
调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。
如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。
Condition的await()方法,如下代码所示:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//当前线程加入等待队列
Node node = addConditionWaiter();
//释放同步状态,也就是释放锁0
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。
当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException。
同步队列的首节点并不会直接加入等待队列,而是通过addConditionWaiter()方法把当前线程构造成一个新的节点并将其加入等待队列中。
如下为当前线程加入等待队列:
通知
调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。
ConditionObject的signal方法:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
调用该方法的前置条件是当前线程必须获取了锁,可以看到signal()方法进行了isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。
节点从等待队列移动到同步队列的过程如下所示:
通过调用同步器的enq(Node node)方法,等待队列中的头节点线程安全地移动到同步队列。当节点移动到同步队列后,当前线程再使用LockSupport唤醒该节点的线程。
被唤醒后的线程,将从await()方法中的while循环中退出(isOnSyncQueue(Node node)方法返回true,节点已经在同步队列中),进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。
成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已经成功地获取了锁。
Codition的signalAll()方法,相当于对等待队列中的每一个节点均执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。