源码
Callable
Callable
是一个带返回值且可抛异常的任务接口。
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
WebAsyncTask
WebAsyncTask
是 Callable
的增强版本,支持自定义超时时间和执行线程池,实现更灵活的异步请求处理。
public class WebAsyncTask<V> implements BeanFactoryAware {
private final Callable<V> callable;
@Nullable
private final Long timeout;
@Nullable
private final AsyncTaskExecutor executor;
@Nullable
private final String executorName;
@Nullable
private BeanFactory beanFactory;
@Nullable
private Callable<V> timeoutCallback;
@Nullable
private Callable<V> errorCallback;
@Nullable
private Runnable completionCallback;
public WebAsyncTask(Callable<V> callable) {
Assert.notNull(callable, "Callable must not be null");
this.callable = callable;
this.timeout = null;
this.executor = null;
this.executorName = null;
}
public WebAsyncTask(long timeout, Callable<V> callable) {
Assert.notNull(callable, "Callable must not be null");
this.callable = callable;
this.timeout = timeout;
this.executor = null;
this.executorName = null;
}
public WebAsyncTask(@Nullable Long timeout, String executorName, Callable<V> callable) {
Assert.notNull(callable, "Callable must not be null");
Assert.notNull(executorName, "Executor name must not be null");
this.callable = callable;
this.timeout = timeout;
this.executor = null;
this.executorName = executorName;
}
public WebAsyncTask(@Nullable Long timeout, AsyncTaskExecutor executor, Callable<V> callable) {
Assert.notNull(callable, "Callable must not be null");
Assert.notNull(executor, "Executor must not be null");
this.callable = callable;
this.timeout = timeout;
this.executor = executor;
this.executorName = null;
}
public Callable<?> getCallable() {
return this.callable;
}
@Nullable
public Long getTimeout() {
return this.timeout;
}
@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
}
@Nullable
public AsyncTaskExecutor getExecutor() {
if (this.executor != null) {
return this.executor;
}
else if (this.executorName != null) {
Assert.state(this.beanFactory != null, "BeanFactory is required to look up an executor bean by name");
return this.beanFactory.getBean(this.executorName, AsyncTaskExecutor.class);
}
else {
return null;
}
}
public void onTimeout(Callable<V> callback) {
this.timeoutCallback = callback;
}
public void onError(Callable<V> callback) {
this.errorCallback = callback;
}
public void onCompletion(Runnable callback) {
this.completionCallback = callback;
}
CallableProcessingInterceptor getInterceptor() {
return new CallableProcessingInterceptor() {
@Override
public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) throws Exception {
return (timeoutCallback != null ? timeoutCallback.call() : CallableProcessingInterceptor.RESULT_NONE);
}
@Override
public <T> Object handleError(NativeWebRequest request, Callable<T> task, Throwable t) throws Exception {
return (errorCallback != null ? errorCallback.call() : CallableProcessingInterceptor.RESULT_NONE);
}
@Override
public <T> void afterCompletion(NativeWebRequest request, Callable<T> task) throws Exception {
if (completionCallback != null) {
completionCallback.run();
}
}
};
}
}
DeferredResult
DeferredResult
允许异步请求处理中,应用线程自主设置结果并注册超时、错误和完成回调。
public class DeferredResult<T> {
private static final Object RESULT_NONE = new Object();
private static final Log logger = LogFactory.getLog(DeferredResult.class);
@Nullable
private final Long timeoutValue;
private final Supplier<?> timeoutResult;
@Nullable
private Runnable timeoutCallback;
@Nullable
private Consumer<Throwable> errorCallback;
@Nullable
private Runnable completionCallback;
@Nullable
private DeferredResultHandler resultHandler;
@Nullable
private volatile Object result = RESULT_NONE;
private volatile boolean expired;
public DeferredResult() {
this(null);
}
public DeferredResult(@Nullable Long timeoutValue) {
this(timeoutValue, () -> RESULT_NONE);
}
public DeferredResult(@Nullable Long timeoutValue, Object timeoutResult) {
this(timeoutValue, () -> timeoutResult);
}
public DeferredResult(@Nullable Long timeoutValue, Supplier<?> timeoutResult) {
this.timeoutValue = timeoutValue;
this.timeoutResult = timeoutResult;
}
public final boolean isSetOrExpired() {
return (this.result != RESULT_NONE || this.expired);
}
public boolean hasResult() {
return (this.result != RESULT_NONE);
}
@Nullable
public Object getResult() {
Object resultToCheck = this.result;
return (resultToCheck != RESULT_NONE ? resultToCheck : null);
}
@Nullable
final Long getTimeoutValue() {
return this.timeoutValue;
}
public void onTimeout(Runnable callback) {
this.timeoutCallback = callback;
}
public void onError(Consumer<Throwable> callback) {
this.errorCallback = callback;
}
public void onCompletion(Runnable callback) {
this.completionCallback = callback;
}
public final void setResultHandler(DeferredResultHandler resultHandler) {
Assert.notNull(resultHandler, "DeferredResultHandler is required");
if (this.expired) {
return;
}
Object resultToHandle;
synchronized (this) {
if (this.expired) {
return;
}
resultToHandle = this.result;
if (resultToHandle == RESULT_NONE) {
this.resultHandler = resultHandler;
return;
}
}
try {
resultHandler.handleResult(resultToHandle);
}
catch (Throwable ex) {
logger.debug("Failed to process async result", ex);
}
}
public boolean setResult(@Nullable T result) {
return setResultInternal(result);
}
private boolean setResultInternal(@Nullable Object result) {
if (isSetOrExpired()) {
return false;
}
DeferredResultHandler resultHandlerToUse;
synchronized (this) {
if (isSetOrExpired()) {
return false;
}
this.result = result;
resultHandlerToUse = this.resultHandler;
if (resultHandlerToUse == null) {
return true;
}
this.resultHandler = null;
}
resultHandlerToUse.handleResult(result);
return true;
}
public boolean setErrorResult(Object result) {
return setResultInternal(result);
}
final DeferredResultProcessingInterceptor getLifecycleInterceptor() {
return new LifecycleInterceptor();
}
@FunctionalInterface
public interface DeferredResultHandler {
void handleResult(@Nullable Object result);
}
private class LifecycleInterceptor implements DeferredResultProcessingInterceptor {
@Override
public <S> boolean handleTimeout(NativeWebRequest request, DeferredResult<S> result) {
boolean continueProcessing = true;
try {
if (timeoutCallback != null) {
timeoutCallback.run();
}
}
finally {
Object value = timeoutResult.get();
if (value != RESULT_NONE) {
continueProcessing = false;
try {
setResultInternal(value);
}
catch (Throwable ex) {
logger.debug("Failed to handle timeout result", ex);
}
}
}
return continueProcessing;
}
@Override
public <S> boolean handleError(NativeWebRequest request, DeferredResult<S> result, Throwable t) {
try {
if (errorCallback != null) {
errorCallback.accept(t);
}
}
finally {
try {
setResultInternal(t);
}
catch (Throwable ex) {
logger.debug("Failed to handle error result", ex);
}
}
return false;
}
@Override
public <S> void afterCompletion(NativeWebRequest request, DeferredResult<S> result) {
expired = true;
if (completionCallback != null) {
completionCallback.run();
}
}
}
}
AsyncContext
AsyncContext
是 Servlet 3.0 提供的异步处理上下文,允许请求线程释放并异步执行任务,最终通过手动完成响应或分发继续处理。
public interface AsyncContext {
String ASYNC_REQUEST_URI = "jakarta.servlet.async.request_uri";
String ASYNC_CONTEXT_PATH = "jakarta.servlet.async.context_path";
String ASYNC_MAPPING = "jakarta.servlet.async.mapping";
String ASYNC_PATH_INFO = "jakarta.servlet.async.path_info";
String ASYNC_SERVLET_PATH = "jakarta.servlet.async.servlet_path";
String ASYNC_QUERY_STRING = "jakarta.servlet.async.query_string";
ServletRequest getRequest();
ServletResponse getResponse();
boolean hasOriginalRequestAndResponse();
void dispatch();
void dispatch(String path);
void dispatch(ServletContext context, String path);
void complete();
void start(Runnable run);
void addListener(AsyncListener listener);
void addListener(AsyncListener listener, ServletRequest request, ServletResponse response);
<T extends AsyncListener> T createListener(Class<T> clazz) throws ServletException;
void setTimeout(long timeout);
long getTimeout();
}
原理
CallableMethodReturnValueHandler
CallableMethodReturnValueHandler
负责处理控制器方法返回的 Callable
类型,借助 WebAsyncManager
启动异步执行并挂起请求,由 Spring MVC 异步处理机制在任务完成后恢复响应。
// RequestMappingHandlerAdapter.handleInternal ->
// RequestMappingHandlerAdapter.invokeHandlerMethod ->
// ServletInvocableHandlerMethod.invokeAndHandle ->
// HandlerMethodReturnValueHandlerComposite.handleReturnValue ->
// HandlerMethodReturnValueHandlerComposite.selectHandler ->
public class CallableMethodReturnValueHandler implements HandlerMethodReturnValueHandler {
@Override
public boolean supportsReturnType(MethodParameter returnType) {
return Callable.class.isAssignableFrom(returnType.getParameterType());
}
@Override
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
Callable<?> callable = (Callable<?>) returnValue;
WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(callable, mavContainer);
}
}
AsyncTaskMethodReturnValueHandler
AsyncTaskMethodReturnValueHandler
用于处理返回值为 WebAsyncTask
的方法,借助 WebAsyncManager
启动异步调用流程,并可注入 BeanFactory
支持自定义线程池与回调配置。
public class AsyncTaskMethodReturnValueHandler implements HandlerMethodReturnValueHandler {
@Nullable
private final BeanFactory beanFactory;
public AsyncTaskMethodReturnValueHandler(@Nullable BeanFactory beanFactory) {
this.beanFactory = beanFactory;
}
@Override
public boolean supportsReturnType(MethodParameter returnType) {
return WebAsyncTask.class.isAssignableFrom(returnType.getParameterType());
}
@Override
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
WebAsyncTask<?> webAsyncTask = (WebAsyncTask<?>) returnValue;
if (this.beanFactory != null) {
webAsyncTask.setBeanFactory(this.beanFactory);
}
WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(webAsyncTask, mavContainer);
}
}
DeferredResultMethodReturnValueHandler
DeferredResultMethodReturnValueHandler
处理控制器返回的 DeferredResult
、ListenableFuture
或 CompletionStage
,通过 WebAsyncManager
启动异步处理,挂起请求并在结果就绪时恢复响应流程。
// RequestMappingHandlerAdapter.handleInternal ->
// RequestMappingHandlerAdapter.invokeHandlerMethod ->
// ServletInvocableHandlerMethod.invokeAndHandle ->
// HandlerMethodReturnValueHandlerComposite.handleReturnValue ->
// HandlerMethodReturnValueHandlerComposite.selectHandler ->
public class DeferredResultMethodReturnValueHandler implements HandlerMethodReturnValueHandler {
@SuppressWarnings({"deprecation", "removal"})
@Override
public boolean supportsReturnType(MethodParameter returnType) {
Class<?> type = returnType.getParameterType();
return (DeferredResult.class.isAssignableFrom(type) ||
org.springframework.util.concurrent.ListenableFuture.class.isAssignableFrom(type) ||
CompletionStage.class.isAssignableFrom(type));
}
@SuppressWarnings({"deprecation", "removal"})
@Override
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
DeferredResult<?> result;
if (returnValue instanceof DeferredResult<?> deferredResult) {
result = deferredResult;
}
else if (returnValue instanceof org.springframework.util.concurrent.ListenableFuture<?> listenableFuture) {
result = adaptListenableFuture(listenableFuture);
}
else if (returnValue instanceof CompletionStage<?> completionStage) {
result = adaptCompletionStage(completionStage);
}
else {
// Should not happen...
throw new IllegalStateException("Unexpected return value type: " + returnValue);
}
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);
}
@SuppressWarnings({"deprecation", "removal"})
private DeferredResult<Object> adaptListenableFuture(org.springframework.util.concurrent.ListenableFuture<?> future) {
DeferredResult<Object> result = new DeferredResult<>();
future.addCallback(new org.springframework.util.concurrent.ListenableFutureCallback<Object>() {
@Override
public void onSuccess(@Nullable Object value) {
result.setResult(value);
}
@Override
public void onFailure(Throwable ex) {
result.setErrorResult(ex);
}
});
return result;
}
private DeferredResult<Object> adaptCompletionStage(CompletionStage<?> future) {
DeferredResult<Object> result = new DeferredResult<>();
future.whenComplete((value, ex) -> {
if (ex != null) {
if (ex instanceof CompletionException && ex.getCause() != null) {
ex = ex.getCause();
}
result.setErrorResult(ex);
}
else {
result.setResult(value);
}
});
return result;
}
}
WebAsyncManager
WebAsyncManager
是 Spring MVC 异步请求的核心管理器,负责协调异步任务执行、超时和错误处理,并在结果就绪后触发异步分派恢复请求流程。
public final class WebAsyncManager {
private static final Object RESULT_NONE = new Object();
private static final AsyncTaskExecutor DEFAULT_TASK_EXECUTOR = new SimpleAsyncTaskExecutor(WebAsyncManager.class.getSimpleName());
private static final Log logger = LogFactory.getLog(WebAsyncManager.class);
private static final CallableProcessingInterceptor timeoutCallableInterceptor = new TimeoutCallableProcessingInterceptor();
private static final DeferredResultProcessingInterceptor timeoutDeferredResultInterceptor = new TimeoutDeferredResultProcessingInterceptor();
@Nullable
private AsyncWebRequest asyncWebRequest;
private AsyncTaskExecutor taskExecutor = DEFAULT_TASK_EXECUTOR;
private boolean isMultipartRequestParsed;
@Nullable
private volatile Object concurrentResult = RESULT_NONE;
@Nullable
private volatile Object[] concurrentResultContext;
private final AtomicReference<State> state = new AtomicReference<>(State.NOT_STARTED);
private final Map<Object, CallableProcessingInterceptor> callableInterceptors = new LinkedHashMap<>();
private final Map<Object, DeferredResultProcessingInterceptor> deferredResultInterceptors = new LinkedHashMap<>();
WebAsyncManager() {}
@SuppressWarnings({"rawtypes", "unchecked"})
public void startCallableProcessing(Callable<?> callable, Object... processingContext) throws Exception {
Assert.notNull(callable, "Callable must not be null");
startCallableProcessing(new WebAsyncTask(callable), processingContext);
}
@SuppressWarnings("NullAway")
public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext) throws Exception {
Assert.notNull(webAsyncTask, "WebAsyncTask must not be null");
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
if (!this.state.compareAndSet(State.NOT_STARTED, State.ASYNC_PROCESSING)) {
throw new IllegalStateException(
"Unexpected call to startCallableProcessing: [" + this.state.get() + "]");
}
Long timeout = webAsyncTask.getTimeout();
if (timeout != null) {
this.asyncWebRequest.setTimeout(timeout);
}
AsyncTaskExecutor executor = webAsyncTask.getExecutor();
if (executor != null) {
this.taskExecutor = executor;
}
List<CallableProcessingInterceptor> interceptors = new ArrayList<>();
interceptors.add(webAsyncTask.getInterceptor());
interceptors.addAll(this.callableInterceptors.values());
interceptors.add(timeoutCallableInterceptor);
final Callable<?> callable = webAsyncTask.getCallable();
final CallableInterceptorChain interceptorChain = new CallableInterceptorChain(interceptors);
this.asyncWebRequest.addTimeoutHandler(() -> {
if (logger.isDebugEnabled()) {
logger.debug("Servlet container timeout notification for " + formatUri(this.asyncWebRequest));
}
Object result = interceptorChain.triggerAfterTimeout(this.asyncWebRequest, callable);
if (result != CallableProcessingInterceptor.RESULT_NONE) {
setConcurrentResultAndDispatch(result);
}
});
this.asyncWebRequest.addErrorHandler(ex -> {
if (logger.isDebugEnabled()) {
logger.debug("Servlet container error notification for " + formatUri(this.asyncWebRequest) + ": " + ex);
}
if (DisconnectedClientHelper.isClientDisconnectedException(ex)) {
ex = new AsyncRequestNotUsableException(
"Servlet container error notification for disconnected client", ex);
}
Object result = interceptorChain.triggerAfterError(this.asyncWebRequest, callable, ex);
result = (result != CallableProcessingInterceptor.RESULT_NONE ? result : ex);
setConcurrentResultAndDispatch(result);
});
this.asyncWebRequest.addCompletionHandler(() -> interceptorChain.triggerAfterCompletion(this.asyncWebRequest, callable));
interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, callable);
startAsyncProcessing(processingContext);
try {
Future<?> future = this.taskExecutor.submit(() -> {
Object result = null;
try {
interceptorChain.applyPreProcess(this.asyncWebRequest, callable);
result = callable.call();
}
catch (Throwable ex) {
result = ex;
}
finally {
result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, result);
}
setConcurrentResultAndDispatch(result);
});
interceptorChain.setTaskFuture(future);
}
catch (Throwable ex) {
Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex);
setConcurrentResultAndDispatch(result);
}
}
@SuppressWarnings("NullAway")
public void startDeferredResultProcessing(
final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {
Assert.notNull(deferredResult, "DeferredResult must not be null");
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
if (!this.state.compareAndSet(State.NOT_STARTED, State.ASYNC_PROCESSING)) {
throw new IllegalStateException(
"Unexpected call to startDeferredResultProcessing: [" + this.state.get() + "]");
}
Long timeout = deferredResult.getTimeoutValue();
if (timeout != null) {
this.asyncWebRequest.setTimeout(timeout);
}
List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<>();
interceptors.add(deferredResult.getLifecycleInterceptor());
interceptors.addAll(this.deferredResultInterceptors.values());
interceptors.add(timeoutDeferredResultInterceptor);
final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);
this.asyncWebRequest.addTimeoutHandler(() -> {
if (logger.isDebugEnabled()) {
logger.debug("Servlet container timeout notification for " + formatUri(this.asyncWebRequest));
}
try {
interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult);
synchronized (WebAsyncManager.this) {
// If application thread set the DeferredResult first in a race,
// we must still not return until setConcurrentResultAndDispatch is done
return;
}
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
}
});
this.asyncWebRequest.addErrorHandler(ex -> {
if (logger.isDebugEnabled()) {
logger.debug("Servlet container error notification for " + formatUri(this.asyncWebRequest));
}
if (DisconnectedClientHelper.isClientDisconnectedException(ex)) {
ex = new AsyncRequestNotUsableException(
"Servlet container error notification for disconnected client", ex);
}
try {
interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex);
synchronized (WebAsyncManager.this) {
// If application thread set the DeferredResult first in a race,
// we must still not return until setConcurrentResultAndDispatch is done
return;
}
}
catch (Throwable interceptorEx) {
setConcurrentResultAndDispatch(interceptorEx);
}
});
this.asyncWebRequest.addCompletionHandler(() ->
interceptorChain.triggerAfterCompletion(this.asyncWebRequest, deferredResult));
interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, deferredResult);
startAsyncProcessing(processingContext);
try {
interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
deferredResult.setResultHandler(result -> {
result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
setConcurrentResultAndDispatch(result);
});
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
}
}
private void startAsyncProcessing(Object[] processingContext) {
synchronized (WebAsyncManager.this) {
this.concurrentResult = RESULT_NONE;
this.concurrentResultContext = processingContext;
}
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
if (logger.isDebugEnabled()) {
logger.debug("Started async request for " + formatUri(this.asyncWebRequest));
}
this.asyncWebRequest.startAsync();
}
private void setConcurrentResultAndDispatch(@Nullable Object result) {
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
synchronized (WebAsyncManager.this) {
if (!this.state.compareAndSet(State.ASYNC_PROCESSING, State.RESULT_SET)) {
if (logger.isDebugEnabled()) {
logger.debug("Async result already set: [" + this.state.get() + "], " +
"ignored result for " + formatUri(this.asyncWebRequest));
}
return;
}
this.concurrentResult = result;
if (logger.isDebugEnabled()) {
logger.debug("Async result set for " + formatUri(this.asyncWebRequest));
}
if (this.asyncWebRequest.isAsyncComplete()) {
if (logger.isDebugEnabled()) {
logger.debug("Async request already completed for " + formatUri(this.asyncWebRequest));
}
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Performing async dispatch for " + formatUri(this.asyncWebRequest));
}
this.asyncWebRequest.dispatch();
}
}
}
StandardServletAsyncWebRequest
StandardServletAsyncWebRequest
是 Spring 框架中封装 Servlet 3.0 异步处理机制的类,用于管理异步请求的启动、超时和完成通知。
public class StandardServletAsyncWebRequest extends ServletWebRequest implements AsyncWebRequest, AsyncListener {
private final List<Runnable> timeoutHandlers = new ArrayList<>();
private final List<Consumer<Throwable>> exceptionHandlers = new ArrayList<>();
private final List<Runnable> completionHandlers = new ArrayList<>();
@Nullable
private Long timeout;
@Nullable
private AsyncContext asyncContext;
private State state;
private final ReentrantLock stateLock = new ReentrantLock();
public StandardServletAsyncWebRequest(HttpServletRequest request, HttpServletResponse response) {
this(request, response, null);
}
@SuppressWarnings("NullAway")
StandardServletAsyncWebRequest(HttpServletRequest request, HttpServletResponse response,
@Nullable StandardServletAsyncWebRequest previousRequest) {
super(request, new LifecycleHttpServletResponse(response));
this.state = (previousRequest != null ? previousRequest.state : State.NEW);
//noinspection DataFlowIssue
((LifecycleHttpServletResponse) getResponse()).setAsyncWebRequest(this);
}
@Override
public void startAsync() {
Assert.state(getRequest().isAsyncSupported(),
"Async support must be enabled on a servlet and for all filters involved " +
"in async request processing. This is done in Java code using the Servlet API " +
"or by adding \"<async-supported>true</async-supported>\" to servlet and " +
"filter declarations in web.xml.");
if (isAsyncStarted()) {
return;
}
if (this.state == State.NEW) {
this.state = State.ASYNC;
}
else {
Assert.state(this.state == State.ASYNC, "Cannot start async: [" + this.state + "]");
}
this.asyncContext = getRequest().startAsync(getRequest(), getResponse());
this.asyncContext.addListener(this);
if (this.timeout != null) {
this.asyncContext.setTimeout(this.timeout);
}
}
@Override
public void dispatch() {
Assert.state(this.asyncContext != null, "AsyncContext not yet initialized");
if (!this.isAsyncComplete()) {
this.asyncContext.dispatch();
}
}
// ...
}
实战
@RestController
@SpringBootApplication
public class Application {
@Autowired
@Qualifier(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME)
private AsyncTaskExecutor applicationTaskExecutor;
@GetMapping("/deferredResult")
public DeferredResult<String> deferredResult() {
DeferredResult<String> deferredResult = new DeferredResult<>(3000L, "timeout");
applicationTaskExecutor.execute(() -> {
try {
Thread.sleep(5000);
deferredResult.setResult("deferredResult");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
return deferredResult;
}
@GetMapping("/webAsyncTask")
public WebAsyncTask<String> webAsyncTask() {
return new WebAsyncTask<String>(3000L,() -> {
Thread.sleep(1000);
return "webAsyncTask";
});
}
@GetMapping("/callable")
public Callable<String> callable() {
return () -> "callable";
}
@GetMapping("/asyncContext")
public void asyncContext(HttpServletRequest request, HttpServletResponse response) {
AsyncContext asyncContext = request.startAsync();
asyncContext.start(() -> {
try {
Thread.sleep(2000);
response.getWriter().write("asyncContext");
} catch (Exception e) {
e.printStackTrace();
} finally {
asyncContext.complete();
}
});
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}