调整了@Bulkhead
j加了type = Type.THREADPOOL
,发现程序无法进入这个方法
@CircuitBreaker(name = "licenseService", fallbackMethod = "buildFallbackLicenseList")
@Bulkhead(name = "bulkheadLicenseService", fallbackMethod = "buildFallbackLicenseList", type = Type.THREADPOOL)
@Retry(name = "retryLicenseService", fallbackMethod = "buildFallbackLicenseList")
@RateLimiter(name = "licenseService", fallbackMethod = "buildFallbackLicenseList")
public List<License> getLicensesByOrganization(String organizationId) {
LOGGER.info("find all licenses belong to {}", organizationId);
LicenseExample example = new LicenseExample();
example.createCriteria().andOrganizationIdEqualTo(organizationId);
return licenseMapper.selectByExample(example);
}
debug发现io.github.resilience4j.spring6.bulkhead.configure.BulkheadAspect
类里面对@Bulkhead
的type
属性进行了判断
@Around(value = "matchAnnotatedClassOrMethod(bulkheadAnnotation)", argNames = "proceedingJoinPoint, bulkheadAnnotation")
public Object bulkheadAroundAdvice(ProceedingJoinPoint proceedingJoinPoint,
@Nullable Bulkhead bulkheadAnnotation) throws Throwable {
Method method = ((MethodSignature) proceedingJoinPoint.getSignature()).getMethod();
String methodName = method.getDeclaringClass().getName() + "#" + method.getName();
if (bulkheadAnnotation == null) {
bulkheadAnnotation = getBulkheadAnnotation(proceedingJoinPoint);
}
if (bulkheadAnnotation == null) { //because annotations wasn't found
return proceedingJoinPoint.proceed();
}
Class<?> returnType = method.getReturnType();
String backend = spelResolver.resolve(method, proceedingJoinPoint.getArgs(), bulkheadAnnotation.name());
if (bulkheadAnnotation.type() == Bulkhead.Type.THREADPOOL) {
final CheckedSupplier<Object> bulkheadExecution =
() -> proceedInThreadPoolBulkhead(proceedingJoinPoint, methodName, returnType, backend);
return fallbackExecutor.execute(proceedingJoinPoint, method, bulkheadAnnotation.fallbackMethod(), bulkheadExecution);
} else {
io.github.resilience4j.bulkhead.Bulkhead bulkhead = getOrCreateBulkhead(methodName,
backend);
final CheckedSupplier<Object> bulkheadExecution = () -> proceed(proceedingJoinPoint, methodName, bulkhead, returnType);
return fallbackExecutor.execute(proceedingJoinPoint, method, bulkheadAnnotation.fallbackMethod(), bulkheadExecution);
}
}
bulkheadAroundAdvice
方法中使用if-else对type
进行不同处理,type=Type.THREADPOOL
时调用proceedInThreadPoolBulkhead
方法
private Object proceedInThreadPoolBulkhead(ProceedingJoinPoint proceedingJoinPoint,
String methodName, Class<?> returnType, String backend) throws Throwable {
if (logger.isDebugEnabled()) {
logger.debug("ThreadPool bulkhead invocation for method {} in backend {}", methodName,
backend);
}
ThreadPoolBulkhead threadPoolBulkhead = threadPoolBulkheadRegistry.bulkhead(backend);
if (CompletionStage.class.isAssignableFrom(returnType)) {
// threadPoolBulkhead.executeSupplier throws a BulkheadFullException, if the Bulkhead is full.
// The RuntimeException is converted into an exceptionally completed future
try {
return threadPoolBulkhead.executeCallable(() -> {
try {
return ((CompletionStage<?>) proceedingJoinPoint.proceed())
.toCompletableFuture().get();
} catch (ExecutionException e) {
throw new CompletionException(e.getCause());
} catch (InterruptedException | CancellationException e) {
throw e;
} catch (Throwable e) {
throw new CompletionException(e);
}
});
} catch (BulkheadFullException ex){
CompletableFuture<?> future = new CompletableFuture<>();
future.completeExceptionally(ex);
return future;
}
} else {
throw new IllegalStateException(
"ThreadPool bulkhead is only applicable for completable futures ");
}
}
proceedInThreadPoolBulkhead
方法判断返回值类型, 如果是CompletionStage
及子类(比如CompletableFuture
)就能处理,否则抛IllegalStateException
因为我们的返回值类型是List<License>
,所以会进入else中抛出IllegalStateException