Java多线程——构建自定义的同步工具

发布于:2025-02-24 ⋅ 阅读:(14) ⋅ 点赞:(0)

状态依赖性的管理

指的是一些操作中有着基于状态的前提条件,如对于队列,不能从一个空的队列中删除元素,不能向一个满的队列中添加元素

abstract class BaseBoundedBuffer<V> {
    @GuardedBy("this")
    private final V[] buf;
    @GuardedBy("this")
    private int tail;
    @GuardedBy("this")
    private int head;
    @GuardedBy("this")
    private int count;

    protected BaseBoundedBuffer(int capacity) {
        this.buf = (V[]) new Object[capacity];
    }

    protected synchronized final void doPut(V v) {
        buf[tail] = v;
        if (++tail == buf.length) tail = 0;
        ++count;
    }

    protected synchronized final V doTake() {
        V v = buf[head];
        buf[head] = null;
        if (++head == buf.length) head = 0;
        --count;
        return v;
    }

    public synchronized final boolean isFull() {
        return count == buf.length;
    }

    public synchronized final boolean isEmpty() {
        return count == 0;
    }
}

如上基于数组实现一个循环缓存,并提供了同步操作

将前提条件的失败传递给调用者

调用者需要捕获异常并在失败后重试

class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
    protected GrumpyBoundedBuffer(int capacity) {
        super(capacity);
    }

    public synchronized void put(V v) throws IllegalStateException {
        if (isFull()) {
            throw new IllegalStateException("Full");
        }
        doPut(v);
    }

    public synchronized V take() throws IllegalStateException {
        if (isEmpty()) {
            throw new IllegalStateException("Empty");
        }
        return doTake();
    }
}

通过轮询与休眠实现简单的阻塞

若休眠时间过短则浪费CPU时间,若过长则响应性低

class SleepBoundedBuffer<V> extends BaseBoundedBuffer<V> {
    protected SleepBoundedBuffer(int capacity) {
        super(capacity);
    }

    public void put(V v) throws InterruptedException {
        while (true) {
            synchronized (this) {
                if (!isFull()) {
                    doPut(v);
                    return;
                }
            }
            Thread.sleep(5000);
        }
    }

    public V take() throws InterruptedException {
        while (true) {
            synchronized (this) {
                if (!isEmpty()) {
                    return doTake();
                }
            }
            Thread.sleep(5000);
        }
    }
}

条件队列

使一组线程能够通过某种方式来等待特定的条件变为真,每个java对象都可以作为条件队列

  • wait()会自动释放锁,并让操作系统挂起当前线程,从而使其他线程获取锁并修改状态
  • notify()/notifyAll()会唤醒被被挂起的一个/全部线程,醒来后会重新获取锁
  • 内置条件队列提供不公平的排队操作,需要公平的排队操作可以使用Condition
class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
    protected BoundedBuffer(int capacity) {
        super(capacity);
    }

    public synchronized void put(V v) throws InterruptedException {
        while (isFull())
            wait();
        boolean wasEmpty = isEmpty();
        doPut(v);
        if (wasEmpty)
            notifyAll();
    }

    public synchronized V take() throws InterruptedException {
        while (isEmpty())
            wait();
        boolean wasFull = isFull();
        V v = doTake();
        if (wasFull)
            notifyAll();
        return v;
    }
}

只有同时满足下面情况才单独调用notify()

  • 所有等待线程的类型都相同:只有一个等待条件,并且所有线程在wait()返回后执行相同的操作
  • 单进单出:最多只能唤醒一个线程来执行

显式的Condition对象

Lock是广义的内置锁,Condition是广义的内置条件队列

public interface Condition {

    void await() throws InterruptedException;

	void awaitUninterruptibly();

    long awaitNanos(long nanosTimeout) throws InterruptedException;

    boolean await(long time, TimeUnit unit) throws InterruptedException;

    boolean awaitUntil(Date deadline) throws InterruptedException;

    void signal();

    void signalAll();
}

每个内置锁只能有一个相关联的内置条件队列,多个线程可能在同一个条件队列上等待不同的条件谓词

class ConditionBaseBoundedBuffer<T> {

    protected final Lock lock = new ReentrantLock();

    private final Condition notFull = lock.newCondition();

    private final Condition notEmpty = lock.newCondition();

    @GuardedBy("lock")
    private final T[] items = (T[]) new Object[1024];
    @GuardedBy("lock")
    private int tail, head, count;

    public void put(T x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();
            items[tail] = x;
            if (++tail == items.length)
                tail = 0;
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();
            T x = items[head];
            items[head] = null;
            if (++head == items.length)
                head = 0;
            --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

如上使用两个Conditiion,分别为notFull和notEmpty

  • 缓存满时put()将阻塞并等待notFull
  • 缓存空时take()将阻塞并等待notEmpty
  • 需要注意调用的是await()/signal()/signalAll(),而不是Object中的wait()/notify()/notifyAll()

AbstractQueuedSynchronizer(AQS)

AQS用于构建锁和同步器的框架,ReentrantLock、Semaphore 、CountDownLatch、 ReentrantReadWriteLock、 SynchronousQueue和 FutureTask是基于AQS 构建的,它们没有直接扩展AQS,而是委托给私有子类

一个简单的闭锁

任何调用await()的线程都将阻塞直到调用signal()将闭锁打开

  • acquireSharedInterruptibly()调用tryAcquireShared(),若失败则放入等待队列
  • releaseShared()调用tryReleaseShared()返回true将闭锁打开
class OneShotLatch {
    private final Sync sync = new Sync();

    public void signal() {
        sync.releaseShared(0);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(0);
    }

    private class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected int tryAcquireShared(int arg) {
            //如果闭锁是开的,操作成功,否则失败
            return getState() == 1 ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            setState(1);    //打开闭锁
            return true;    //其他线程可以获取闭锁
        }
    }
}

java.util.concurrent同步器类中的AQS

同步器类没有直接扩展AQS,而是委托给私有子类