归档
测试
参考:热点流控-测试
新建
ClusterDemoApplication2
public class ClusterDemoApplication2 {
public static void main(String[] args) {
System.setProperty("csp.sentinel.dashboard.server", "127.0.0.1:8080");
System.setProperty("project.name", "My-Cluster-8866");
System.setProperty("server.port", "10012");
SpringApplication.run(ClusterDemoApplication.class, args);
}
}
启动 2 应用后
- 访问:http://localhost:10010/hello/test1
- 访问:http://localhost:10012/hello/test1
在控制台设置
- 点击 集群流控:http://127.0.0.1:8080/#/dashboard/cluster/server/My-Cluster-8866
- 点击 + 新增 Token Server:
- 机器类型 使用默认:
应用内机器
- 选择机器 选第一个即可
- Server 端口 使用默认:
18730
- 最大允许 QPS 设置为:
2
- 请从中选取 client:
- 选第一个,再点击 →
- 机器类型 使用默认:
- 点击 保存
// POST http://127.0.0.1:8080//cluster/assign/single_server/My-Cluster-8866 { "clusterMap": { "machineId": "10.32.51.130@8720", "ip": "10.32.51.130", "port": 18730, "clientSet": [ "10.32.51.130@8721" ], "belongToApp": true, "maxAllowedQps": 2 }, "remainingList": [] }
- 点击 簇点链路:http://127.0.0.1:8080/#/dashboard/identity/My-Cluster-8866
- 在
sayHello
列,点击 + 流控:- 是否集群 打勾
- 集群阈值模式 选
总体阈值
- 集群阈值 设置为:
2
- 其他使用默认值
- 点击 新增
// POST http://127.0.0.1:8080//v1/flow/rule { "enable": false, "strategy": 0, "grade": 1, "controlBehavior": 0, "resource": "sayHello", "limitApp": "default", "clusterMode": true, "clusterConfig": { "thresholdType": "1" }, "app": "My-Cluster-8866", "ip": "10.32.51.130", "port": "8720", "count": 2 }
- 在
再测试
- 连续刷 3 次:http://localhost:10010/hello/test1
- 出现:
Oops, [test1] blocked by Sentinel
原理
改模式
- Path:
/cluster/assign/single_server/{app}
控制台
com.alibaba.csp.sentinel.dashboard.controller.cluster.ClusterAssignController
@RestController
@RequestMapping("/cluster/assign")
public class ClusterAssignController {
// 全路径为: /cluster/assign/single_server/{app}
@PostMapping("/single_server/{app}")
public Result<ClusterAppAssignResultVO> apiAssignSingleClusterServersOfApp(
@PathVariable String app,
@RequestBody ClusterAppSingleServerAssignRequest assignRequest
) {
... // 参数校验
try {
return Result.ofSuccess(
clusterAssignService.applyAssignToApp( // ref: sign_m_100
app, Collections.singletonList(assignRequest.getClusterMap()),
assignRequest.getRemainingList()
)
);
} ... // catch
}
}
com.alibaba.csp.sentinel.dashboard.service.ClusterAssignServiceImpl
@Service
public class ClusterAssignServiceImpl implements ClusterAssignService {
// sign_m_100
@Override
public ClusterAppAssignResultVO applyAssignToApp(
String app, List<ClusterAppAssignMap> clusterMap,
Set<String> remainingSet
) {
... // 参数校验
// 更改集群模式--为服务端 & 推送配置
clusterMap.stream()
.filter(Objects::nonNull)
.filter(ClusterAppAssignMap::getBelongToApp)
.map(e -> {
String ip = e.getIp(); // 传过来的值为: 10.32.51.130 (相当于连接到控制台的 Sentinel 使用者的机器 IP)
int commandPort = parsePort(e); // 从 "machineId": "10.32.51.130@8720" 里取,取的值为: 8720
CompletableFuture<Void> f = modifyMode(ip, commandPort, ClusterStateManager.CLUSTER_SERVER) // 改集群模式, ref: sign_m_101
.thenCompose(v -> applyServerConfigChange(app, ip, commandPort, e)); // 推送配置, ref: sign_m_102
return Tuple2.of(e.getMachineId(), f);
})
.forEach(t -> handleFutureSync(t, failedServerSet)); // 失败或超时 (10s) 则记录到集合里
// 更改集群模式--为客户端 & 推送配置
clusterMap.parallelStream()
.filter(Objects::nonNull)
.forEach(e -> applyAllClientConfigChange(app, e, failedClientSet)); // 改集群模式为客户端并推送配置, ref: sign_m_110
// 解绑剩余 (未分配的) 机器
applyAllRemainingMachineSet(app, remainingSet, failedClientSet);
return new ClusterAppAssignResultVO() ... // 组装失败的 Client/Server 集合并返回
}
// sign_m_101 改集群模式
private CompletableFuture<Void> modifyMode(String ip, int port, int mode) {
return sentinelApiClient.modifyClusterMode(ip, port, mode); // 命令为: setClusterMode, 处理者 ref: sign_c_110
}
// sign_m_102 推送配置给服务端
private CompletableFuture<Void> applyServerConfigChange(
String app, String ip, int commandPort,
ClusterAppAssignMap assignMap
) {
ServerTransportConfig transportConfig = new ServerTransportConfig()
.setPort(assignMap.getPort()) // 传过来的值为: 18730
.setIdleSeconds(600);
/**
* 推送服务端配置 (服务端口)
* 命令为: cluster/server/modifyTransportConfig
*/
return sentinelApiClient.modifyClusterServerTransportConfig(app, ip, commandPort, transportConfig)
.thenCompose(v -> applyServerFlowConfigChange(app, ip, commandPort, assignMap)) // 推送集群整体流控 QPS, ref: sign_m_103
.thenCompose(v -> applyServerNamespaceSetConfig(app, ip, commandPort, assignMap)); // 传过来的 namespaceSet 为空,不会处理,略...
}
// sign_m_103 推送集群整体流控 QPS
private CompletableFuture<Void> applyServerFlowConfigChange(
String app, String ip, int commandPort,
ClusterAppAssignMap assignMap
) {
Double maxAllowedQps = assignMap.getMaxAllowedQps();
... // maxAllowedQps 校验。注: QPS 大于 20w 则不推送
/**
* 推送集群流控配置
* 命令为: cluster/server/modifyFlowConfig
*/
return sentinelApiClient.modifyClusterServerFlowConfig(app, ip, commandPort,
new ServerFlowConfig().setMaxAllowedQps(maxAllowedQps));
}
// sign_m_110 改集群模式为客户端并推送配置
private void applyAllClientConfigChange(
String app, ClusterAppAssignMap assignMap,
Set<String> failedSet
) {
Set<String> clientSet = assignMap.getClientSet();
... // 空校验
final String serverIp = assignMap.getIp(); // 集群服务端 IP: 10.32.51.130
final int serverPort = assignMap.getPort(); // 集群服务端端口: 18730
clientSet.stream()
.map(MachineUtils::parseCommandIpAndPort)
.filter(Optional::isPresent)
.map(Optional::get)
.map(ipPort -> { // 将客户端机器 ID 用 @ 符截取成 [ip, port] 元组
CompletableFuture<Void> f = sentinelApiClient
/**
* 改集群模式为客户端,命令为: setClusterMode
* 处理者 ref: sign_c_110
*/
.modifyClusterMode(ipPort.r1, ipPort.r2, ClusterStateManager.CLUSTER_CLIENT)
/**
* 推送客户端配置 (服务端的 IP 和端口)
* 命令为: cluster/client/modifyConfig
*/
.thenCompose(v -> sentinelApiClient.modifyClusterClientConfig(app, ipPort.r1, ipPort.r2,
new ClusterClientConfig().setRequestTimeout(20)
.setServerHost(serverIp)
.setServerPort(serverPort)
));
return Tuple2.of(ipPort.r1 + '@' + ipPort.r2, f);
})
.forEach(t -> handleFutureSync(t, failedSet)); // 失败或超时 (10s) 则记录到集合里
}
}
应用机器
com.alibaba.csp.sentinel.command.handler.cluster.ModifyClusterModeCommandHandler
// sign_c_110 改集群模式命令处理者
@CommandMapping(name = "setClusterMode", desc = "set cluster mode...")
public class ModifyClusterModeCommandHandler implements CommandHandler<String> {
@Override
public CommandResponse<String> handle(CommandRequest request) {
try {
int mode = Integer.valueOf(request.getParam("mode"));
... // 对模式对应的 SPI 的校验
ClusterStateManager.applyState(mode); // 改模式, ref: sign_m_200
return CommandResponse.ofSuccess("success");
} ... // catch
}
}
com.alibaba.csp.sentinel.cluster.ClusterStateManager
DynamicSentinelProperty
参考:入口控制-设置规则 sign_c_100
public final class ClusterStateManager {
private static volatile SentinelProperty<Integer> stateProperty = new DynamicSentinelProperty<Integer>();
private static final PropertyListener<Integer> PROPERTY_LISTENER = new ClusterStatePropertyListener();
static {
InitExecutor.doInit();
stateProperty.addListener(PROPERTY_LISTENER);
}
// sign_m_200 改模式
public static void applyState(Integer state) {
stateProperty.updateValue(state); // 最终触发内部处理, ref: sign_m_210
}
private static class ClusterStatePropertyListener implements PropertyListener<Integer> {
... // configLoad 实现
@Override
public synchronized void configUpdate(Integer value) {
applyStateInternal(value);
}
}
// sign_m_210 状态更改处理
private static boolean applyStateInternal(Integer state) {
... // state 校验
try {
switch (state) {
case CLUSTER_CLIENT:
return setToClient(); // 设置为客户端模式, ref: sign_m_211
case CLUSTER_SERVER:
return setToServer(); // 设置为服务端模式, ref: sign_m_215
case CLUSTER_NOT_STARTED:
setStop();
return true;
... // default 处理
}
} ... // catch
}
// sign_m_211 设置为客户端模式
public static boolean setToClient() {
if (mode == CLUSTER_CLIENT) {
return true;
}
mode = CLUSTER_CLIENT;
sleepIfNeeded(); // 转换完之后,至少要等 5s 才能换模式
lastModified = TimeUtil.currentTimeMillis();
return startClient(); // ref: sign_m_212
}
// sign_m_212 启动客户端并关闭服务端
private static boolean startClient() {
try {
EmbeddedClusterTokenServer server = EmbeddedClusterTokenServerProvider.getServer();
if (server != null) {
server.stop();
}
ClusterTokenClient tokenClient = TokenClientProvider.getClient();
if (tokenClient != null) { // 默认实现为 DefaultClusterTokenClient, ref: sign_c_230
tokenClient.start(); // ref: sign_m_230
RecordLog.info("[ClusterStateManager] Changing cluster mode to client");
return true;
} ... // else
} ... // catch
}
// sign_m_215 设置为服务端模式
public static boolean setToServer() {
... // 同样要等 5s 才能换模式
return startServer(); // ref: sign_m_216
}
// sign_m_216 启动服务端并关闭客户端
private static boolean startServer() {
try {
ClusterTokenClient tokenClient = TokenClientProvider.getClient();
if (tokenClient != null) {
tokenClient.stop();
}
EmbeddedClusterTokenServer server = EmbeddedClusterTokenServerProvider.getServer();
if (server != null) { // 默认实现为 DefaultEmbeddedTokenServer, ref: sign_c_220
server.start(); // ref: sign_m_218
RecordLog.info("[ClusterStateManager] Changing cluster mode to server");
return true;
} ... // else
} ... // catch
}
}
启用服务端
com.alibaba.csp.sentinel.cluster.server.DefaultEmbeddedTokenServer
/** sign_c_220 默认内嵌 Token 服务器 */
public class DefaultEmbeddedTokenServer implements EmbeddedClusterTokenServer {
// 相当于对此进行一层封装
private final ClusterTokenServer server = new SentinelDefaultTokenServer(true); // ref: sign_cm_221
// sign_m_218
@Override
public void start() throws Exception {
server.start(); // ref: sign_m_220
}
}
com.alibaba.csp.sentinel.cluster.server.SentinelDefaultTokenServer
public class SentinelDefaultTokenServer implements ClusterTokenServer {
// sign_cm_221
public SentinelDefaultTokenServer(boolean embedded) {
this.embedded = embedded;
ClusterServerConfigManager.addTransportConfigChangeObserver(new ServerTransportConfigObserver() {
@Override
public void onTransportConfigChange(ServerTransportConfig config) {
changeServerConfig(config); // ref: sign_m_221
}
});
initNewServer(); // 只是创建,并未启动...
}
// sign_m_220
@Override
public void start() throws Exception {
if (shouldStart.compareAndSet(false, true)) {
startServerIfScheduled(); // ref: sign_m_222
}
}
// sign_m_221
private synchronized void changeServerConfig(ServerTransportConfig config) {
...
int newPort = config.getPort();
... // 新端口与旧端口相同,则返回。(相当于界面用非默认端口 `18730`,则会启动 2 次 Netty 服务)
try {
if (server != null) {
stopServer();
}
this.server = new NettyTransportServer(newPort);
this.port = newPort;
startServerIfScheduled();
} ... // catch
}
// sign_m_222
private void startServerIfScheduled() throws Exception {
if (shouldStart.get()) {
if (server != null) {
server.start(); // 启动 Netty 服务, ref: sign_m_223
ClusterStateManager.markToServer();
if (embedded) {
handleEmbeddedStart();
}
}
}
}
}
Netty-服务
com.alibaba.csp.sentinel.cluster.server.NettyTransportServer
public class NettyTransportServer implements ClusterTokenServer {
private final int port;
public NettyTransportServer(int port) {
this.port = port;
}
// sign_m_223 启动 Netty 服务
@Override
public void start() {
if (!currentState.compareAndSet(SERVER_STATUS_OFF, SERVER_STATUS_STARTING)) {
return;
}
ServerBootstrap b = new ServerBootstrap();
this.bossGroup = new NioEventLoopGroup(1);
this.workerGroup = new NioEventLoopGroup(DEFAULT_EVENT_LOOP_THREADS);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
p.addLast(new NettyRequestDecoder());
p.addLast(new LengthFieldPrepender(2));
p.addLast(new NettyResponseEncoder());
p.addLast(new TokenServerHandler(connectionPool));
}
})
... // childOption
b.bind(port).addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.cause() != null) {
...
try {
Thread.sleep(failCount * RETRY_SLEEP_MS); // 等待 n * 2s
start(); // 有异常,则尝试 3 次启动, ref: sign_m_223
} ... // catch
} else {
// 没有异常,则改状态为启动完成
currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_STARTED);
}
}
});
}
}
启用客户端
com.alibaba.csp.sentinel.cluster.client.DefaultClusterTokenClient
/** sign_c_230 默认客户端 */
public class DefaultClusterTokenClient implements ClusterTokenClient {
// SPI 加载时会调用
public DefaultClusterTokenClient() {
ClusterClientConfigManager.addServerChangeObserver(new ServerChangeObserver() {
@Override
public void onRemoteServerChange(ClusterClientAssignConfig assignConfig) {
changeServer(assignConfig); // ref: sign_m_231
}
});
initNewConnection(); // host 为空,不会创建连接...
}
// sign_m_230
@Override
public void start() throws Exception {
if (shouldStart.compareAndSet(false, true)) {
startClientIfScheduled(); // ref: sign_m_232
}
}
// sign_m_231
private void changeServer(ClusterClientAssignConfig config) {
... // 服务端的 IP 和端口没改,则返回不处理
try {
if (transportClient != null) {
transportClient.stop();
}
this.transportClient = new NettyTransportClient(config.getServerHost(), config.getServerPort());
... // 记录配置
startClientIfScheduled(); // ref: sign_m_232
} ... // catch
}
// sign_m_232
private void startClientIfScheduled() throws Exception {
if (shouldStart.get()) {
if (transportClient != null) {
transportClient.start(); // 启动 Netty 客户端连接, ref: sign_m_233
} ... // else
}
}
}
Netty-客户端连接
com.alibaba.csp.sentinel.cluster.client.NettyTransportClient
public class NettyTransportClient implements ClusterTransportClient {
private final String host;
private final int port;
public NettyTransportClient(String host, int port) {
... // 校验参数
this.host = host;
this.port = port;
}
// sign_m_233 启动 Netty 客户端连接
@Override
public void start() throws Exception {
shouldRetry.set(true);
startInternal(); // ref: sign_m_234
}
// sign_m_234
private void startInternal() {
connect(initClientBootstrap()); // sign_m_235 || sign_m_236
}
// sign_m_235
private Bootstrap initClientBootstrap() {
Bootstrap b = new Bootstrap();
eventLoopGroup = new NioEventLoopGroup();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class)
... // option
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
clientHandler = new TokenClientHandler(currentState, disconnectCallback);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
pipeline.addLast(new NettyResponseDecoder());
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new NettyRequestEncoder());
pipeline.addLast(clientHandler);
}
});
return b;
}
// sign_m_236
private void connect(Bootstrap b) {
if (currentState.compareAndSet(CLIENT_STATUS_OFF, CLIENT_STATUS_PENDING)) {
b.connect(host, port)
.addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.cause() != null) { // 有异常,连接失败
... // log
failConnectedTime.incrementAndGet();
channel = null;
} else { // 无异常,连接成功
failConnectedTime.set(0);
channel = future.channel();
... // log
}
}
});
}
}
}
更新规则
- 参考:WebUI-更新规则-应用端 sign_c_200
- 服务端并没有将规则同步到客户端
- 更新规则的入口在
ModifyClusterFlowRulesCommandHandler ( cluster/server/modifyFlowRules )
ClusterFlowRuleManager #loadRules
ClusterFlowRuleManager #FLOW_RULES
- 集群规则现在还没有同步,需要自己改造控制台
- ref: https://github.com/alibaba/Sentinel/issues/1670
- 可用 V2,也可用配置源
- 然后在
ModifyRulesCommandHandler
用 SPI 做下扩展
- 然后在
- 最主要的是要保持
ruleId
一致
流控原理
参考:
com.alibaba.csp.sentinel.slots.block.flow.FlowRuleChecker
public class FlowRuleChecker {
// sign_m_411 集群流控 (调用者, ref: sign_m_721)
private static boolean passClusterCheck(
FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized
) {
try {
// 客户端模式下,SPI 提供的实现者为: DefaultClusterTokenClient, ref: sign_c_421
TokenService clusterService = pickClusterService();
... // clusterService 为空则回退到本地单机校验
long flowId = rule.getClusterConfig().getFlowId();
// 根据规则 id 请求 token 服务器,以获取流控结果. ref: sign_m_421
TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
return applyTokenResult(result, rule, context, node, acquireCount, prioritized); // ref: sign_m_412
} ... // catch
// 回退到本地单机校验
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
// sign_m_412 根据流控结果进行处理
private static boolean applyTokenResult(
TokenResult result, FlowRule rule, Context context,
DefaultNode node,
int acquireCount, boolean prioritized
) {
switch (result.getStatus()) {
case TokenResultStatus.OK:
return true; // 放行
case TokenResultStatus.SHOULD_WAIT:
try {
Thread.sleep(result.getWaitInMs());
} catch (InterruptedException e) {
e.printStackTrace();
}
return true; // 等待指定时间后放行
case TokenResultStatus.NO_RULE_EXISTS:
case TokenResultStatus.BAD_REQUEST:
case TokenResultStatus.FAIL:
case TokenResultStatus.TOO_MANY_REQUEST:
// 回退到本地单机校验
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
case TokenResultStatus.BLOCKED:
default:
return false;
}
}
}
com.alibaba.csp.sentinel.cluster.client.DefaultClusterTokenClient
// sign_c_421
public class DefaultClusterTokenClient implements ClusterTokenClient {
// sign_m_421 请求 token 服务器,获取流控结果
@Override
public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
...
FlowRequestData data = new FlowRequestData().setCount(acquireCount)
.setFlowId(flowId).setPriority(prioritized);
// 此请求的处理者为 FlowRequestProcessor, ref: sign_c_510 | sign_m_510
ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data);
try {
TokenResult result = sendTokenRequest(request); // ref: sign_m_422
...
return result;
} ... // catch
}
// sign_m_422
private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {
...
// 通过 Netty 客户端 (NettyTransportClient) 发送
ClusterResponse response = transportClient.sendRequest(request);
TokenResult result = new TokenResult(response.getStatus());
if (response.getData() != null) {
FlowTokenResponseData responseData = (FlowTokenResponseData) response.getData();
result.setRemaining(responseData.getRemainingCount())
.setWaitInMs(responseData.getWaitInMs());
}
return result;
}
}
服务端流控
com.alibaba.csp.sentinel.cluster.server.processor.FlowRequestProcessor
// sign_c_510 流控请求处理器
@RequestType(ClusterConstants.MSG_TYPE_FLOW)
public class FlowRequestProcessor implements RequestProcessor<FlowRequestData, FlowTokenResponseData> {
// sign_m_510
@Override
public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<FlowRequestData> request) {
// 服务端模式下,SPI 提供的实现者为 DefaultTokenService, ref: sign_c_521
TokenService tokenService = TokenServiceProvider.getService();
long flowId = request.getData().getFlowId();
int count = request.getData().getCount();
boolean prioritized = request.getData().isPriority();
TokenResult result = tokenService.requestToken(flowId, count, prioritized); // ref: sign_m_521
return toResponse(result, request); // ref: sign_m_511
}
// sign_m_511
private ClusterResponse<FlowTokenResponseData> toResponse(TokenResult result, ClusterRequest request) {
return new ClusterResponse<>(request.getId(), request.getType(), result.getStatus(),
new FlowTokenResponseData()
.setRemainingCount(result.getRemaining())
.setWaitInMs(result.getWaitInMs())
);
}
}
com.alibaba.csp.sentinel.cluster.flow.DefaultTokenService
// sign_c_521
@Spi(isDefault = true)
public class DefaultTokenService implements TokenService {
// sign_m_521
@Override
public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
...
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
if (rule == null) { // 测试的时候,为 null (因为没同步)
return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
}
// 集群流控校验 (逻辑较简单。集群指标在 ClusterFlowRuleManager #applyClusterFlowRule 里初始化)
return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);
}
}
总结
- 集群流控时,每次请求都会转发到服务端(由服务端去控制)