背景
Netty源码中大量使用了Future和Promise,学习ChannelFuture和ChannelFuture有助于理解Netty的设计思路。
本文的重点内容在于梳理清楚这些类的关系以及结合源码实现介绍这些类的作用,其中核心逻辑在于DefaultPromise和DefaultChannelPromise,Netty中Future和Promoise相关的类和继承关系如下所示:
图中父类中有两个Future接口,一个是JUC定义的Future(后续用juc.Future表示),一个是Netty继承JUC.Future定义的Future.
1.juc.Future和Future
juc.Future源码如下所示:
public interface Future<V> {
// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 是否已被取消
boolean isCancelled();
// 任务是否已执行完成
boolean isDone();
// 阻塞式获取结果信息
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}
在FutureTask源码分析中对juc.Future的接口和作用已进行过较为详细地介绍,这里不再赘述。
Netty中Future接口定义如下:
public interface Future<V> extends java.util.concurrent.Future<V> {
boolean isSuccess();
boolean isCancellable();
Throwable cause();
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> sync() throws InterruptedException;
Future<V> syncUninterruptibly();
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
V getNow();
}
从四个角度对juc.Future进行了增强:
[1] 状态相关
相对于juc.Future判断是否已完成isDone()
、是否已取消isCancelled()
,更细致地定义了是否已成功isSuccess()
,且定义了用于保存任务失败场景的异常对象cause()
;
[2] 引入了监听器的概念并提供了添加和删除监听器接口,以支持异步回调;
[3] 引入了sync和await阻塞等待任务,并分别为超时和响应中断重载了方法;
[4] 获取方式引入了非阻塞的getNow()
方法获取执行结果。
2.AbstractFuture
public abstract class AbstractFuture<V> implements Future<V> {
@Override
public V get() throws InterruptedException, ExecutionException {
// 阻塞等待
await();
// 能执行到这里,说明阻塞已被唤醒,任务已执行完成或阻塞等待被中断
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
// 等待timeout后自动苏醒,不需要被动唤醒
if (await(timeout, unit)) {
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
throw new TimeoutException();
}
}
AbstractFuture内容较为简单,仅实现了两个阻塞的get方法。核心是借助await和await(timeout)实现阻塞,等待任务执行完成或被中断,然后调用getNow()非阻塞式地获取执行结果。如果执行成功,通过getNow()返回执行结果;如果执行失败,cause()返回异常类,get方法会抛出异常。
3.Promise和DefaultPromise
Promise作为Future的子类,其接口定义如下:
public interface Promise<V> extends Future<V> {
// Promise相对Future拥有可以操作结果值的能力
Promise<V> setSuccess(V result);
boolean trySuccess(V result);
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
boolean setUncancellable();
// 重载await/sync和listener的返回值为Promise
@Override Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override Promise<V> await() throws InterruptedException;
@Override Promise<V> awaitUninterruptibly();
@Override Promise<V> sync() throws InterruptedException;
@Override Promise<V> syncUninterruptibly();
}
相对于Future的只读提供了写(操作结果)的能力,如setSuccess/trySuccess, setFailure/tryFailure等; 另外,将Future中await和sync和监听器接口的返回值重载为Promise类型。
3.1 DefaultPromise属性
DefaultPromise作为Promise的实现类,是一个完整的类,可以直接使用。先看一下属性:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// 存在执行结果或异常对象(执行失败)
private volatile Object result;
// Promise内部用于执行任务的线程池对象,netty框架中,使用NioEventLoop线程
private final EventExecutor executor;
// 监听器列表容器
private Object listeners;
// 因获取结果阻塞等待的线程数量
private short waiters;
//...
}
[1] result
用于存放执行结果,初始值为空。当任务正常执行时被设置执行结果,当任务执行异常时,被设置异常对象。
Netty引入原子对象包装result, 用于原子性地读取和设置result的值:
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
// --同时引入几个常量对象--
private static final Object SUCCESS = new Object();
// UNCANCELLABLE用于表示设置Promise不可被取消
private static final Object UNCANCELLABLE = new Object();
// CANCELLATION_CAUSE_HOLDER用于设置异常结果
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace( new CancellationException(), DefaultPromise.class, "cancel(...)"));
此时,RESULT_UPDATER存在以下几种用法:
// 将result设置为不可取消状态
RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)
// 将result设置为objResult(只能从null或UNCANCELLABLE的初始状态)
RESULT_UPDATER.compareAndSet(this, null, objResult)
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)
// 将result设置为异常状态
RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)
RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce));
Promise任务的取消:
当任务执行完成后,不可取消任务;也可以在任务执行前,将任务标识为禁止取消:
public boolean cancel(boolean mayInterruptIfRunning) {
// 只有初始状态才可以取消任务
if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
if (checkNotifyWaiters()) {
notifyListeners();
}
return true;
}
return false;
}
DefaultPromise提供了如下方法根据result的值返回Promise的状态:
// (1)是否成功,result被设置了正常值
@Override
public boolean isSuccess() {
Object result = this.result;
return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
}
// (2)是否已执行完成,被设置了正常值或者异常对象(执行失败也是执行完成)
@Override
public boolean isDone() {
return isDone0(result);
}
private static boolean isDone0(Object result) {
return result != null && result != UNCANCELLABLE;
}
// (3)只有初始状态,才可以被取消
@Override
public boolean isCancellable() {
return result == null;
}
// (4)是否被取消了
@Override
public boolean isCancelled() {
return isCancelled0(result);
}
private static boolean isCancelled0(Object result) {
return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
}
[2] waiters
用于计数阻塞在获取该Promise/Future对象执行结果的线程数,这些线程调用Object的wait方法而处于等待队列中。当Promise设置为执行成功或者执行失败时,将会唤醒这些线程。因此,DefaultPromise源码中会看到synchronized和wait()/notifyAll()的使用。
线程的状态如等待和阻塞、同步队列等已在"线程的状态"经过系统和全面地介绍,这里不再赘述。
[3] listeners
监听器是一个列表对象,与该属性对应的方法有添加、删除、调用监听器。向列表中添加和删除元素的逻辑较为简单,这里重点看一下对监听器的调用,当任务执行完成时会触发notifyListeners方法:
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
notifyListeners方法会将调用监听器的方法提交给executor线程池对象进行(netty框架中对应NioEventLoopGroup).
继续跟进notifyListenersNow()方法:
private void notifyListenersNow() {
Object listeners;
synchronized (this) {
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
synchronized (this) {
if (this.listeners == null) {
notifyingListeners = false;
return;
}
listeners = this.listeners;
this.listeners = null;
}
}
}
逻辑较为简单,遍历listeners并调用notifyListener0方法:
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
在try-catch绝对保护下调用监听器的operationComplete方法。
3.2 DefaultPromise方法
[1] await方法
DefaultPromise为不同场景提供了不同的await方法,其中:await()和await(timeout)的区别仅在于是否持续休眠或者仅休眠一段时间;类似wait()之于wait(timeout)。await与awaitUninterruptibly的区别在于是否响应中断,前者遇到中断时会抛出异常,后者会忽略中断信号。以await为代表介绍await系列方法的实现:isDone()
public Promise<V> await() throws InterruptedException {
// 任务已完成,直接返回
if (isDone()) {
return this;
}
// 响应中断,如果调用await方法的线程被终端,则抛出异常
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 死锁鉴定
checkDeadLock();
//synchronized-double-check
synchronized (this) {
while (!isDone()) {
// 等待线程计数+1
incWaiters();
try {
// 阻塞等待任务执行完成
wait();
} finally {
// 等待线程计数-1
decWaiters();
}
}
}
return this;
}
isDone()表示任务是否执行完成:执行完成直接返回;没有完成,调用Object的wait()方法,线程陷入同步队列阻塞等待,注意这里的锁对象是Promise本身。
这里还有个点需要注意:checkDeadLock()用于进行死锁鉴定,在死锁发送前抛出异常,从而防止死锁。
DefaultPromise的线程池对象不能阻塞在该Promise对象上,因为同步队列中的阻塞的线程依赖该线程池调用notifyAll唤醒,否则陷入死锁。
[2] sync方法
与await一样,sync也提供了响应中断和不响应中断两个方法, 这里也仅以sync()为例进行介绍:
@Override
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
@Override
public Promise<V> syncUninterruptibly() {
awaitUninterruptibly();
rethrowIfFailed();
return this;
}
逻辑较为简单,调用await()阻塞直到任务执行完成,任务异常时抛出异常。
[3] get方法
DefaultPromise提供了三个版本的get方法: get()、get(timeout)、getNow(). 其中getNow()不阻塞,当任务未完成时返回空,而get()和get(timeout)当任务未完成时阻塞等待,区别是get(timeout)超时后主动醒来,而 get()持续陷入阻塞状态知道被notify/notifyAll唤醒。
这里以get()为例进行介绍:
@Override
public V get() throws InterruptedException, ExecutionException {
Object result = this.result;
// 任务未完成,则阻塞等待任务完成
if (!isDone0(result)) {
await();
result = this.result;
}
// 结果为空
if (result == SUCCESS || result == UNCANCELLABLE) {
return null;
}
// 任务设置为异常状态时,抛出异常
Throwable cause = cause0(result);
if (cause == null) {
return (V) result;
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
逻辑较为简单,如果任务已完成,则直接返回result或抛出异常(任务处于失败状态), 否则调用await()阻塞等待任务完成后被唤醒,再返回result或抛出异常。
[4] setSuccess和trySuccess
@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
return this;
}
throw new IllegalStateException("complete already: " + this);
}
@Override
public boolean trySuccess(V result) {
return setSuccess0(result);
}
setSuccess和trySuccess通过调用setSuccess0设置正常结果值,区别是setSuccess设值失败时会抛出异常,而trySuccess不会抛出异常。进一步查看setSuccess0逻辑:
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
if (checkNotifyWaiters()) {
notifyListeners();
}
return true;
}
return false;
}
setSuccess0调用setValue0实现设值功能,核心是checkNotifyWaiters()和notifyListeners(),前者是唤醒因获取执行结构而阻塞的线程,后者是回调监听器。外层有一个判断逻辑,只有result的值为null或者UNCANCELLABLE时才会执行唤醒和调用监听器的逻辑,否则直接返回false.
即,只有第一次设置才会返回true(注意:设置Promise为不可取消状态 不属于设值,isDone仍返回false), 其他场景返回false.
继续查看checkNotifyWaiters:
private synchronized boolean checkNotifyWaiters() {
if (waiters > 0) {
notifyAll();
}
return listeners != null;
}
如果有因获取Future执行结果而阻塞的线程,则通过notifyAll唤醒。
继续查看notifyListeners():
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
notifyListeners()方法的核心逻辑是notifyListenersNow(),notifyListenersNow已在前文介绍过,依次调用监听器方法。
这里需要注意的是notifyListenersNow()的调用由Promise关联的线程池执行,在Netty中调用监听器的逻辑由Promise关联的NioEventLoop执行(唤醒阻塞线程也是)。
[5] setFailure和tryFailure
@Override
public Promise<V> setFailure(Throwable cause) {
if (setFailure0(cause)) {
return this;
}
throw new IllegalStateException("complete already: " + this, cause);
}
@Override
public boolean tryFailure(Throwable cause) {
return setFailure0(cause);
}
setFailure和tryFailure通过调用setFailure0设置异常结果值,区别是setFailure设值失败时会抛出异常,而tryFailure不会抛出异常。进一步查看setFailure0逻辑:
private boolean setFailure0(Throwable cause) {
return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
if (checkNotifyWaiters()) {
notifyListeners();
}
return true;
}
return false;
}
setFailure0调用setValue0实现设值功能,逻辑与setSuccess和trySuccess相同。
4.ChannelFuture和ChannelPromise和DefaultChannelPromise
4.1 ChannelFuture
public interface ChannelFuture extends Future<Void> {
Channel channel();
boolean isVoid();//Ignore
// ... 重写listener/await/sync方法的返回结果为ChannelFuture
}
ChannelFuture相对于Future引入了channel的概念,新增了一个channel()方法用于返回Future绑定的通道对象。
并将所有的listener/await/sync方法的返回结果重写为ChannelFuture类型。
4.2 ChannelPromise
public interface ChannelPromise extends ChannelFuture, Promise<Void> {
// 引入无参的setSuccess和trySuccess方法
ChannelPromise setSuccess();
boolean trySuccess();
ChannelPromise unvoid();//Ignore
// ... 重写listener/await/sync方法的返回结果为ChannelPromise
}
ChannelPromise类继承了ChannelFuture和Promise接口,成为具有通道能力的Promise,同时引入了无参的setSuccess()和trySuccess()方法。然后将所有的listener/await/sync方法的返回结果重写为ChannelPromise类型。
4.3 DefaultChannelPromise
DefaultChannelPromise作为DefaultPromise接口的实现类,也作为DefaultPromise的子类,是一个具备通道能力的Promise。内部维持了一个Channel通道对象,可通过channel()返回返回该对象。
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
private final Channel channel;
@Override
public Channel channel() {
return channel;
}
public DefaultChannelPromise(Channel channel) {
this.channel = checkNotNull(channel, "channel");
}
@Override
protected EventExecutor executor() {
EventExecutor e = super.executor();
if (e == null) {
return channel().eventLoop();
} else {
return e;
}
}
// ...
}
Netty构造DefaultChannelPromise时,使用public DefaultChannelPromise(Channel channel)
构造函数,而没有给父类设置线程池,因此executor()方法返回的线程池对象实际是通道绑定的NioEventLoop.
上述为DefaultChannelPromise关于实现channel的特性,其他方法如trysuccess/setsucess等均调用父类(DefaultPromise)方法实现,并返回当前对象,以setSuccess为例:
@Override
public ChannelPromise setSuccess(Void result) {
super.setSuccess(result);
return this;
}
5.CloseFuture和PendingRegistrationPromise
CloseFuture和PendingRegistrationPromise作为DefaultChannelPromise的子类,在DefaultChannelPromise的基础上分别进行了定制。
5.1 CloseFuture
static final class CloseFuture extends DefaultChannelPromise {
CloseFuture(AbstractChannel ch) {super(ch);}
@Override public ChannelPromise setSuccess() { throw new IllegalStateException();}
@Override public ChannelPromise setFailure(Throwable cause) {throw new IllegalStateException();}
@Override public boolean trySuccess() {throw new IllegalStateException();}
@Override public boolean tryFailure(Throwable cause) {throw new IllegalStateException();}
boolean setClosed() {
return super.trySuccess();
}
}
CloseFuture禁用了所有Promise提供的设置结果的方法,并单独提供了一个setClosed()方法,只有该方法调用才会设置Promise执行结果为完成。
在Netty中启动Netty的线程会阻塞在该CloseFuture上,直到注册异常或者关闭Netty时才会从阻塞中恢复。
5.2 GlobalEventExecutor
public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
static final class PendingRegistrationPromise extends DefaultChannelPromise {
private volatile boolean registered;
void registered() {
registered = true;
}
PendingRegistrationPromise(Channel channel) {
super(channel);
}
@Override
protected EventExecutor executor() {
if (registered) {
return super.executor();
}
// The registration failed so we can only use the GlobalEventExecutor as last resort to notify.
return GlobalEventExecutor.INSTANCE;
}
}
PendingRegistrationPromise引入了注册状态的概念,对应在内部维持了一个registered布尔变量以及获取注册状态的方法registered()。
从Netty功能的角度考虑,通道只有注册成功后,才会绑定NioEventLoop线程,DefaultChannelPromise的executor()才会返回线程对象,否则(注册失败时)返回空对象。PendingRegistrationPromise对注册异常场景提供了逃生门,使用GlobalEventExecutor.INSTANCE(替代NioEventLoop)唤醒阻塞在当前Promise上的线程。