SpiLoader
SPI 是一种服务发现机制,它允许第三方为某个接口提供实现,而这些实现可以在运行时被发现和加载。简单来说,就是定义一个接口,然后允许别人提供这个接口的具体实现,而主程序不需要在编译时就知道这些实现类的具体细节,只在运行时去发现和加载它们。 Sentinel 并没有直接使用 JDK 内置的 ServiceLoader
,而是自己实现了一套 SpiLoader
。这主要是因为 Sentinel 的 SpiLoader
提供了更灵活的控制,例如:
排序 (@Spi 注解的 order 属性): 可以控制加载的实现类的优先级,从而决定它们在 Slot Chain 中的执行顺序。
单例 (@Spi 注解的 isSingleton 属性): 可以指定某个 SPI 实现是否是单例模式。
按需加载: 更好地控制加载时机。
控制台通讯
服务端与控制台初始化都是通过InitFunc来实现,InitFunc是一个接口,服务加载是通过SpiLoader来加载。默认懒加载在第一次调用时加载Env类静态块执行,如果配置了非懒加载在自动装配类SentinelWebAutoConfiguration的init方法也会启动时加载
public class Env {
public static final Sph sph = new CtSph();
static {
// If init fails, the process will exit.
InitExecutor.doInit();
}
}
这里doInit()方法会获取所有配置的InitFunc服务进行初始化。
List<InitFunc> initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();
List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
for (InitFunc initFunc : initFuncs) {
RecordLog.info("[InitExecutor] Found init func: {}", initFunc.getClass().getCanonicalName());
insertSorted(initList, initFunc);
}
for (OrderWrapper w : initList) {
w.func.init();//调用init方法初始化
RecordLog.info("[InitExecutor] Executing {} with order {}",
w.func.getClass().getCanonicalName(), w.order);
}
在sentinel-transport-common-1.8.5.jar包的META-INF/services下就定义了两个和控制台通讯相关的InitFunc:CommandCenterInitFunc和HeartbeatSenderInitFunc。
心跳检测
HeartbeatSenderInitFunc主要建立应用控制台和应用程序直接的心跳检测。将客户端的消息通讯地址发送给控制台。这个在控制台的机器列表可以看到对应信息。
来看HeartbeatSenderInitFunc.init()方法:
public void init() {
//这里还是通过SpiLoader获取HeartbeatSender的服务实例
HeartbeatSender的服务实例 sender = HeartbeatSenderProvider.getHeartbeatSender();
if (sender == null) {
RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");
return;
}
//初始化定时线程池ScheduledThreadPoolExecutor
initSchedulerIfNeeded();
long interval = retrieveInterval(sender);
setIntervalIfNotExists(interval);
//启动定时发送心跳
scheduleHeartbeatTask(sender, interval);
}
启动定时方法:
private void scheduleHeartbeatTask(/*@NonNull*/ final HeartbeatSender sender, /*@Valid*/ long interval) {
pool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
sender.sendHeartbeat();
} catch (Throwable e) {
RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);
}
}
}, 5000, interval, TimeUnit.MILLISECONDS);
RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: "
+ sender.getClass().getCanonicalName());
}
这里看到每隔5秒发送一次心跳包。
HeartbeatSender的spi在sentinel-transport-simple-http-1.8.5.jar包中定义实现为SimpleHttpHeartbeatSender。来看下具体心跳包发送内容。
SimpleHttpHeartbeatSender.sendHeartbeat()主要代码如下
public boolean sendHeartbeat() throws Exception {
//获取控制台地址
Endpoint addrInfo = getAvailableAddress();
//构造请求对象
SimpleHttpRequest request = new SimpleHttpRequest(addrInfo, TransportConfig.getHeartbeatApiPath());
//设置心跳报文
request.setParams(heartBeat.generateCurrentMessage());
//发送心跳请求
SimpleHttpResponse response = httpClient.post(request);
return false;
}
这里heartBeat变量是HeartbeatMessage类型
HeartbeatMessage#generateCurrentMessage()
public HeartbeatMessage() {
message.put("hostname", HostNameUtil.getHostName());
message.put("ip", TransportConfig.getHeartbeatClientIp());
message.put("app", AppNameUtil.getAppName());
// Put application type (since 1.6.0).
message.put("app_type", String.valueOf(SentinelConfig.getAppType()));
message.put("port", String.valueOf(TransportConfig.getPort()));
}
public Map<String, String> generateCurrentMessage() {
// Version of Sentinel.
message.put("v", Constants.SENTINEL_VERSION);
// Actually timestamp.
message.put("version", String.valueOf(TimeUtil.currentTimeMillis()));
message.put("port", String.valueOf(TransportConfig.getPort()));
return message;
}
这里将本服务的ip和端口信息发送给console,这样控制台就可以与应用程序通过该端口进行通讯。
数据通讯
应用端通讯服务初始化是通过CommandCenterInitFunc。
CommandCenterInitFunc.init()方法调用CommandCenterProvider.getCommandCenter()获取CommandCenter。最后通过SpiLoader.of(CommandCenter.class).loadHighestPriorityInstance();还是SPI机制。这个默认配置在 sentinel-transport-simple-http-1.8.5.jar包中
默认的CommandCenter是SimpleHttpCommandCenter。
SimpleHttpCommandCenter的start()方法会启动一个ServerSocket来和sentinel的console来进行通讯。默认端口是8719,可通过csp.sentinel.api.port指定。这样console会不断从应用拉取流量控制数据,并且在console端配置新的流量规则可以推送到应用端。
有请求指令到来时会交给HttpEventTask来执行具体指令,不同的指令由不同的CommandHandler实现类来处理,像修改流量控制会使用ModifyRulesCommandHandler来处理。ModifyRulesCommandHandler.handler()方法根据不同的流控规则来更新内存控制规则。
ModifyRulesCommandHandler.handler()
public CommandResponse<String> handle(CommandRequest request) {
String type = request.getParam("type");
// rule data in get parameter
String data = request.getParam("data");
...
String result = "success";
if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {
List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);
//流量控制规则刷新
FlowRuleManager.loadRules(flowRules);
if (!writeToDataSource(getFlowDataSource(), flowRules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {
List<AuthorityRule> rules = JSONArray.parseArray(data, AuthorityRule.class);
//黑白名单刷新
AuthorityRuleManager.loadRules(rules);
if (!writeToDataSource(getAuthorityDataSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {
List<DegradeRule> rules = JSONArray.parseArray(data, DegradeRule.class);
//降级规则刷新
DegradeRuleManager.loadRules(rules);
if (!writeToDataSource(getDegradeDataSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {
List<SystemRule> rules = JSONArray.parseArray(data, SystemRule.class);
//系统规则刷新
SystemRuleManager.loadRules(rules);
if (!writeToDataSource(getSystemSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
}
return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));
}
流量控制工作原理
在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName
),每次资源调用都会创建一个 Entry
对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU
API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain)链表。
springboot自动装配通过SentinelWebAutoConfiguration类自动装配SentinelWebInterceptor。SentinelWebInterceptor实现了HandlerInterceptor接口,是一个拦截器,在请求到达controller之前通过preHandle()可以执行自定义操作。
AbstractSentinelInterceptor#preHandle()
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {
try {
//请求资源名称
String resourceName = getResourceName(request);
if (StringUtil.isEmpty(resourceName)) {
return true;
}
if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {
return true;
}
// Parse the request origin using registered origin parser.
String origin = parseOrigin(request);
String contextName = getContextName(request);
ContextUtil.enter(contextName, origin);
//最重要的方法,内部调用不同的功能slot进行流量控制
Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
return true;
} catch (BlockException e) {
//如果发生BlockException,则触发流量控制规则
try {
handleBlockException(request, response, e);
} finally {
ContextUtil.exit();
}
return false;
}
}
SphU.entry()
SphU的默认实例是CtSph,最后会进入其entryWithPriority()方法
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking will be done.
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
// Using default context.
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// Global switch is close, no rule checking will do.
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
//获取执行链
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
/*
* Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
* so no rule checking will be done.
*/
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
lookProcessChain()方法通过SlotChainProvider.newSlotChain()实例slot链,最后通过
SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault().build()
使用SPI机制从classpath中加载默认的slot链。
在sentinel-core-1.8.5.jar包中有对应的默认配置文件
META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot
默认配置内容
# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
每一个slot都有不同的作用,作为功能链表依次进行调用。
NodeSelectorSlot
负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;ClusterBuilderSlot
则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;StatisticSlot
则用于记录、统计不同纬度的 runtime 指标监控信息;FlowSlot
则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;AuthoritySlot
则根据配置的黑白名单和调用来源信息,来做黑白名单控制;DegradeSlot
则通过统计信息以及预设的规则,来做熔断降级;SystemSlot
则通过系统的状态,例如 load1 等,来控制总的入口流量;