状态依赖性的管理
指的是一些操作中有着基于状态的前提条件,如对于队列,不能从一个空的队列中删除元素,不能向一个满的队列中添加元素
abstract class BaseBoundedBuffer<V> {
@GuardedBy("this")
private final V[] buf;
@GuardedBy("this")
private int tail;
@GuardedBy("this")
private int head;
@GuardedBy("this")
private int count;
protected BaseBoundedBuffer(int capacity) {
this.buf = (V[]) new Object[capacity];
}
protected synchronized final void doPut(V v) {
buf[tail] = v;
if (++tail == buf.length) tail = 0;
++count;
}
protected synchronized final V doTake() {
V v = buf[head];
buf[head] = null;
if (++head == buf.length) head = 0;
--count;
return v;
}
public synchronized final boolean isFull() {
return count == buf.length;
}
public synchronized final boolean isEmpty() {
return count == 0;
}
}
如上基于数组实现一个循环缓存,并提供了同步操作
将前提条件的失败传递给调用者
调用者需要捕获异常并在失败后重试
class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
protected GrumpyBoundedBuffer(int capacity) {
super(capacity);
}
public synchronized void put(V v) throws IllegalStateException {
if (isFull()) {
throw new IllegalStateException("Full");
}
doPut(v);
}
public synchronized V take() throws IllegalStateException {
if (isEmpty()) {
throw new IllegalStateException("Empty");
}
return doTake();
}
}
通过轮询与休眠实现简单的阻塞
若休眠时间过短则浪费CPU时间,若过长则响应性低
class SleepBoundedBuffer<V> extends BaseBoundedBuffer<V> {
protected SleepBoundedBuffer(int capacity) {
super(capacity);
}
public void put(V v) throws InterruptedException {
while (true) {
synchronized (this) {
if (!isFull()) {
doPut(v);
return;
}
}
Thread.sleep(5000);
}
}
public V take() throws InterruptedException {
while (true) {
synchronized (this) {
if (!isEmpty()) {
return doTake();
}
}
Thread.sleep(5000);
}
}
}
条件队列
使一组线程能够通过某种方式来等待特定的条件变为真,每个java对象都可以作为条件队列
- wait()会自动释放锁,并让操作系统挂起当前线程,从而使其他线程获取锁并修改状态
- notify()/notifyAll()会唤醒被被挂起的一个/全部线程,醒来后会重新获取锁
- 内置条件队列提供不公平的排队操作,需要公平的排队操作可以使用Condition
class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
protected BoundedBuffer(int capacity) {
super(capacity);
}
public synchronized void put(V v) throws InterruptedException {
while (isFull())
wait();
boolean wasEmpty = isEmpty();
doPut(v);
if (wasEmpty)
notifyAll();
}
public synchronized V take() throws InterruptedException {
while (isEmpty())
wait();
boolean wasFull = isFull();
V v = doTake();
if (wasFull)
notifyAll();
return v;
}
}
只有同时满足下面情况才单独调用notify()
- 所有等待线程的类型都相同:只有一个等待条件,并且所有线程在wait()返回后执行相同的操作
- 单进单出:最多只能唤醒一个线程来执行
显式的Condition对象
Lock是广义的内置锁,Condition是广义的内置条件队列
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
每个内置锁只能有一个相关联的内置条件队列,多个线程可能在同一个条件队列上等待不同的条件谓词
class ConditionBaseBoundedBuffer<T> {
protected final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
@GuardedBy("lock")
private final T[] items = (T[]) new Object[1024];
@GuardedBy("lock")
private int tail, head, count;
public void put(T x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[tail] = x;
if (++tail == items.length)
tail = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
T x = items[head];
items[head] = null;
if (++head == items.length)
head = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
如上使用两个Conditiion,分别为notFull和notEmpty
- 缓存满时put()将阻塞并等待notFull
- 缓存空时take()将阻塞并等待notEmpty
- 需要注意调用的是await()/signal()/signalAll(),而不是Object中的wait()/notify()/notifyAll()
AbstractQueuedSynchronizer(AQS)
AQS用于构建锁和同步器的框架,ReentrantLock、Semaphore 、CountDownLatch、 ReentrantReadWriteLock、 SynchronousQueue和 FutureTask是基于AQS 构建的,它们没有直接扩展AQS,而是委托给私有子类
一个简单的闭锁
任何调用await()的线程都将阻塞直到调用signal()将闭锁打开
- acquireSharedInterruptibly()调用tryAcquireShared(),若失败则放入等待队列
- releaseShared()调用tryReleaseShared()返回true将闭锁打开
class OneShotLatch {
private final Sync sync = new Sync();
public void signal() {
sync.releaseShared(0);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(0);
}
private class Sync extends AbstractQueuedSynchronizer {
@Override
protected int tryAcquireShared(int arg) {
//如果闭锁是开的,操作成功,否则失败
return getState() == 1 ? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int arg) {
setState(1); //打开闭锁
return true; //其他线程可以获取闭锁
}
}
}
java.util.concurrent同步器类中的AQS
同步器类没有直接扩展AQS,而是委托给私有子类