Spring Cloud Alibaba Sentinel 基本工作原理源码阅读

发布于:2025-07-24 ⋅ 阅读:(34) ⋅ 点赞:(0)

SpiLoader

SPI 是一种服务发现机制,它允许第三方为某个接口提供实现,而这些实现可以在运行时被发现和加载。简单来说,就是定义一个接口,然后允许别人提供这个接口的具体实现,而主程序不需要在编译时就知道这些实现类的具体细节,只在运行时去发现和加载它们。 Sentinel 并没有直接使用 JDK 内置的 ServiceLoader,而是自己实现了一套 SpiLoader。这主要是因为 Sentinel 的 SpiLoader 提供了更灵活的控制,例如:

排序 (@Spi 注解的 order 属性): 可以控制加载的实现类的优先级,从而决定它们在 Slot Chain 中的执行顺序。

单例 (@Spi 注解的 isSingleton 属性): 可以指定某个 SPI 实现是否是单例模式。

按需加载: 更好地控制加载时机。

控制台通讯

服务端与控制台初始化都是通过InitFunc来实现,InitFunc是一个接口,服务加载是通过SpiLoader来加载。默认懒加载在第一次调用时加载Env类静态块执行,如果配置了非懒加载在自动装配类SentinelWebAutoConfiguration的init方法也会启动时加载

public class Env {

    public static final Sph sph = new CtSph();

    static {
        // If init fails, the process will exit.
        InitExecutor.doInit();
    }

}

这里doInit()方法会获取所有配置的InitFunc服务进行初始化。

            List<InitFunc> initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();
            List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
            for (InitFunc initFunc : initFuncs) {
                RecordLog.info("[InitExecutor] Found init func: {}", initFunc.getClass().getCanonicalName());
                insertSorted(initList, initFunc);
            }
            for (OrderWrapper w : initList) {
                w.func.init();//调用init方法初始化
                RecordLog.info("[InitExecutor] Executing {} with order {}",
                    w.func.getClass().getCanonicalName(), w.order);
            }

在sentinel-transport-common-1.8.5.jar包的META-INF/services下就定义了两个和控制台通讯相关的InitFunc:CommandCenterInitFunc和HeartbeatSenderInitFunc。

心跳检测

HeartbeatSenderInitFunc主要建立应用控制台和应用程序直接的心跳检测。将客户端的消息通讯地址发送给控制台。这个在控制台的机器列表可以看到对应信息。

来看HeartbeatSenderInitFunc.init()方法:

    public void init() {
    //这里还是通过SpiLoader获取HeartbeatSender的服务实例
        HeartbeatSender的服务实例 sender = HeartbeatSenderProvider.getHeartbeatSender();
        if (sender == null) {
            RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");
            return;
        }
		//初始化定时线程池ScheduledThreadPoolExecutor
        initSchedulerIfNeeded();
        long interval = retrieveInterval(sender);
        setIntervalIfNotExists(interval);
        //启动定时发送心跳
        scheduleHeartbeatTask(sender, interval);
    }

启动定时方法:

    private void scheduleHeartbeatTask(/*@NonNull*/ final HeartbeatSender sender, /*@Valid*/ long interval) {
        pool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    sender.sendHeartbeat();
                } catch (Throwable e) {
                    RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);
                }
            }
        }, 5000, interval, TimeUnit.MILLISECONDS);
        RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: "
            + sender.getClass().getCanonicalName());
    }

这里看到每隔5秒发送一次心跳包。

HeartbeatSender的spi在sentinel-transport-simple-http-1.8.5.jar包中定义实现为SimpleHttpHeartbeatSender。来看下具体心跳包发送内容。

SimpleHttpHeartbeatSender.sendHeartbeat()主要代码如下

    public boolean sendHeartbeat() throws Exception {
      	//获取控制台地址
        Endpoint addrInfo = getAvailableAddress();
       //构造请求对象
        SimpleHttpRequest request = new SimpleHttpRequest(addrInfo, TransportConfig.getHeartbeatApiPath());
      //设置心跳报文
        request.setParams(heartBeat.generateCurrentMessage());
        //发送心跳请求
        SimpleHttpResponse response = httpClient.post(request);     
        return false;
    }

这里heartBeat变量是HeartbeatMessage类型

HeartbeatMessage#generateCurrentMessage()

    public HeartbeatMessage() {
        message.put("hostname", HostNameUtil.getHostName());
        message.put("ip", TransportConfig.getHeartbeatClientIp());
        message.put("app", AppNameUtil.getAppName());
        // Put application type (since 1.6.0).
        message.put("app_type", String.valueOf(SentinelConfig.getAppType()));
        message.put("port", String.valueOf(TransportConfig.getPort()));
    }    
public Map<String, String> generateCurrentMessage() {
        // Version of Sentinel.
        message.put("v", Constants.SENTINEL_VERSION);
        // Actually timestamp.
        message.put("version", String.valueOf(TimeUtil.currentTimeMillis()));
        message.put("port", String.valueOf(TransportConfig.getPort()));
        return message;
    }

这里将本服务的ip和端口信息发送给console,这样控制台就可以与应用程序通过该端口进行通讯。

数据通讯

应用端通讯服务初始化是通过CommandCenterInitFunc。

CommandCenterInitFunc.init()方法调用CommandCenterProvider.getCommandCenter()获取CommandCenter。最后通过SpiLoader.of(CommandCenter.class).loadHighestPriorityInstance();还是SPI机制。这个默认配置在 sentinel-transport-simple-http-1.8.5.jar包中

默认的CommandCenter是SimpleHttpCommandCenter。

SimpleHttpCommandCenter的start()方法会启动一个ServerSocket来和sentinel的console来进行通讯。默认端口是8719,可通过csp.sentinel.api.port指定。这样console会不断从应用拉取流量控制数据,并且在console端配置新的流量规则可以推送到应用端。

有请求指令到来时会交给HttpEventTask来执行具体指令,不同的指令由不同的CommandHandler实现类来处理,像修改流量控制会使用ModifyRulesCommandHandler来处理。ModifyRulesCommandHandler.handler()方法根据不同的流控规则来更新内存控制规则。

ModifyRulesCommandHandler.handler()

    public CommandResponse<String> handle(CommandRequest request) {
        
        String type = request.getParam("type");
        // rule data in get parameter
        String data = request.getParam("data");
		...
        String result = "success";

        if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {
            List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);
            //流量控制规则刷新
            FlowRuleManager.loadRules(flowRules);
            if (!writeToDataSource(getFlowDataSource(), flowRules)) {
                result = WRITE_DS_FAILURE_MSG;
            }
            return CommandResponse.ofSuccess(result);
        } else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {
            List<AuthorityRule> rules = JSONArray.parseArray(data, AuthorityRule.class);
           //黑白名单刷新
            AuthorityRuleManager.loadRules(rules);
            if (!writeToDataSource(getAuthorityDataSource(), rules)) {
                result = WRITE_DS_FAILURE_MSG;
            }
            return CommandResponse.ofSuccess(result);
        } else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {
            List<DegradeRule> rules = JSONArray.parseArray(data, DegradeRule.class);
          //降级规则刷新
            DegradeRuleManager.loadRules(rules);
            if (!writeToDataSource(getDegradeDataSource(), rules)) {
                result = WRITE_DS_FAILURE_MSG;
            }
            return CommandResponse.ofSuccess(result);
        } else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {
            List<SystemRule> rules = JSONArray.parseArray(data, SystemRule.class);
          //系统规则刷新
            SystemRuleManager.loadRules(rules);
            if (!writeToDataSource(getSystemSource(), rules)) {
                result = WRITE_DS_FAILURE_MSG;
            }
            return CommandResponse.ofSuccess(result);
        }
        return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));
    }

流量控制工作原理

在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain)链表。

springboot自动装配通过SentinelWebAutoConfiguration类自动装配SentinelWebInterceptor。SentinelWebInterceptor实现了HandlerInterceptor接口,是一个拦截器,在请求到达controller之前通过preHandle()可以执行自定义操作。

AbstractSentinelInterceptor#preHandle()

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
        throws Exception {
        try {
           //请求资源名称
            String resourceName = getResourceName(request);

            if (StringUtil.isEmpty(resourceName)) {
                return true;
            }
            
            if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {
                return true;
            }
            
            // Parse the request origin using registered origin parser.
            String origin = parseOrigin(request);
            String contextName = getContextName(request);
            ContextUtil.enter(contextName, origin);
           //最重要的方法,内部调用不同的功能slot进行流量控制
            Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
            request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
            return true;
        } catch (BlockException e) {
          //如果发生BlockException,则触发流量控制规则
            try {
                handleBlockException(request, response, e);
            } finally {
                ContextUtil.exit();
            }
            return false;
        }
    }

SphU.entry()

SphU的默认实例是CtSph,最后会进入其entryWithPriority()方法

    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
            // so here init the entry only. No rule checking will be done.
            return new CtEntry(resourceWrapper, null, context);
        }

        if (context == null) {
            // Using default context.
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }

        // Global switch is close, no rule checking will do.
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }
		//获取执行链
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

        /*
         * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
         * so no rule checking will be done.
         */
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }

        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }

lookProcessChain()方法通过SlotChainProvider.newSlotChain()实例slot链,最后通过

SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault().build()

使用SPI机制从classpath中加载默认的slot链。

在sentinel-core-1.8.5.jar包中有对应的默认配置文件

META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot

默认配置内容

# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

每一个slot都有不同的作用,作为功能链表依次进行调用。

  • NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;

  • ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;

  • StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息;

  • FlowSlot 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;

  • AuthoritySlot 则根据配置的黑白名单和调用来源信息,来做黑白名单控制;

  • DegradeSlot 则通过统计信息以及预设的规则,来做熔断降级;

  • SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;


网站公告

今日签到

点亮在社区的每一天
去签到