分布式微服务系统架构第163集:哈罗电池设备Netty网关架构

发布于:2025-08-13 ⋅ 阅读:(22) ⋅ 点赞:(0)

加群联系作者vx:xiaoda0423

仓库地址:https://webvueblog.github.io/JavaPlusDoc/

https://1024bat.cn/

https://github.com/webVueBlog/fastapi_plus

https://webvueblog.github.io/JavaPlusDoc/

点击勘误issues,哪吒感谢大家的阅读

  • 读取 gateway.url.firewall 配置(逗号分隔的白名单路径)

  • 注册两个拦截器:

  1. 你的自定义拦截器 getWebInterceptor(),并对白名单 excludePathPatterns(...) 放行

  2. sentinelWebInterceptor(限流用),对所有路径生效

存在的常见坑

  1. 继承 WebMvcConfigurationSupport 会禁用 Spring Boot 的自动配置(静态资源、消息转换器等都要你手动配)——通常不建议。

  2. @RefreshScope 放在这个类上,白名单并不会“热更新”excludePathPatterns 是启动时就定死的)。

  3. 手动 replaceAll("\\s","")+split(",") 容易踩空格/空值坑,Spring Boot 自带的类型绑定更稳。


推荐实现 A:常规(重启后生效,最简单稳定)

👉 用 WebMvcConfigurer + **配置绑定 @ConfigurationProperties**。

这版最接近你现有逻辑,但需要重启或重新部署后才会生效。

1) 属性绑定(把白名单直接绑成 List)

// FirewallProperties.java
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;

@Component
@ConfigurationProperties(prefix = "gateway.url")
public class FirewallProperties {
    /**
     * 白名单路径,如:/actuator/**, /public/**, /error
     */
    private List<String> firewall = new ArrayList<>();

    public List<String> getFirewall() { return firewall; }
    public void setFirewall(List<String> firewall) { this.firewall = firewall; }
}

2) 你的业务拦截器(示例骨架)

// WebInterceptor.java
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;

@Component
public class WebInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
            throws Exception {
        // TODO 放你的鉴权、签名校验、日志脱敏、IP 限制等逻辑
        return true; // 返回 false 则拦截
    }
}

3) 注册拦截器(不再继承 WebMvcConfigurationSupport)

// WebMvcConfig.java
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
@RequiredArgsConstructor
public class WebMvcConfig implements WebMvcConfigurer {

    private final WebInterceptor webInterceptor;
    private final FirewallProperties firewallProperties;
    private final SentinelWebInterceptor sentinelWebInterceptor; // 来自 Sentinel 依赖,能自动注入

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        // 1) 先执行:你的业务/防火墙拦截器(对白名单放行)
        registry.addInterceptor(webInterceptor)
                .addPathPatterns("/**")
                .excludePathPatterns(firewallProperties.getFirewall())
                .order(-10); // 数字越小越先执行

        // 2) 再执行:限流 Sentinel
        registry.addInterceptor(sentinelWebInterceptor)
                .addPathPatterns("/**")
                .order(0);
    }
}

4) application.yml 示例

gateway:
  url:
    firewall:
      - /actuator/**
      - /public/**
      - /error
      - /health

推荐实现 B:需要热更新白名单(不重启也生效)

👉 把白名单“匹配逻辑”放进拦截器,每次请求都按最新配置判断放行;用 @RefreshScope 只刷新数据,不刷新 Spring MVC 映射。

1) 可刷新的白名单配置

// FirewallProperties.java
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;

@RefreshScope
@Component
@ConfigurationProperties(prefix = "gateway.url")
public class FirewallProperties {
    private List<String> firewall = new ArrayList<>();
    public List<String> getFirewall() { return firewall; }
    public void setFirewall(List<String> firewall) { this.firewall = firewall; }
}

2) 在拦截器里自己判断“白名单即放行”

// WebInterceptor.java
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.stereotype.Component;
import org.springframework.util.AntPathMatcher;
import org.springframework.web.servlet.HandlerInterceptor;

@Component
public class WebInterceptor implements HandlerInterceptor {
    private final FirewallProperties props;
    private final AntPathMatcher matcher = new AntPathMatcher();

    public WebInterceptor(FirewallProperties props) {
        this.props = props;
    }

    @Override
    public boolean preHandle(HttpServletRequest req, HttpServletResponse res, Object handler)
            throws Exception {
        String uri = req.getRequestURI();
        // 命中白名单 => 直接放行(支持 /public/** 这类通配)
        for (String p : props.getFirewall()) {
            if (matcher.match(p, uri)) return true;
        }

        // 非白名单 => 你的校验逻辑
        // TODO 签名、token、IP、频率限制等
        return true;
    }
}

3) 注册时就不要再用 excludePathPatterns 了

@Configuration
@RequiredArgsConstructor
public class WebMvcConfig implements WebMvcConfigurer {

    private final WebInterceptor webInterceptor;
    private final SentinelWebInterceptor sentinelWebInterceptor;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(webInterceptor)
                .addPathPatterns("/**")
                .order(-10);

        registry.addInterceptor(sentinelWebInterceptor)
                .addPathPatterns("/**")
                .order(0);
    }
}

这样,Nacos/Config 刷新 gateway.url.firewall 后,下一次请求立刻按新白名单生效


必要依赖(示例)

<!-- Spring Web -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- 如果用 Sentinel 的 Web 拦截器 -->
<dependency>
  <groupId>com.alibaba.csp</groupId>
  <artifactId>sentinel-parameter-flow-control</artifactId>
  <version>…</version>
</dependency>

<!-- 如果要 @RefreshScope(Spring Cloud Config / Nacos)-->
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter</artifactId>
</dependency>

使用顺序(小白版)

  1. 把上面 3 个类(或 5 个类,取 A 或 B 方案)放进你的工程。

  2. 在 application.yml 写好白名单路径(可以先放 /public/**/actuator/**)。

  3. 启动项目:

  • 访问白名单里的地址(应直接通过)

  • 访问非白名单地址(应走你的拦截逻辑)

  • 如果你需要热更新:用“方案 B”,在配置中心改白名单,/actuator/refresh 或自动刷新后,不用重启就生效。


  • 小技巧 & 说明

    • 顺序order 数字越小越先执行,通常“白名单/鉴权”优先,再限流。

    • Spring Boot 3 默认使用 PathPattern,AntPathMatcher 也可以在拦截器里用,通配语法基本一致(/a/**/a/*)。

    • **不要再继承 WebMvcConfigurationSupport**,除非你明确要完全接管 MVC 配置。

    • 反代之后取真实 IP:在拦截器里用 X-Forwarded-For(记得在网关/NGINX 开启透传)。

    总体架构(逻辑视图)

    ┌─────────┐    TCP(4G/物联)      ┌──────────────┐      ┌─────────────┐
    │  设备端  │ ───────────────────▶ │ Nginx/L4 LB  │ ───▶ │ Netty 网关xN │
    └─────────┘  (IP Hash/一致哈希)   └──────┬───────┘      └──────┬──────┘
                                             │Consistent Hash       │
                                             │路由一致性            │
                                             ▼                      ▼
                                       ┌─────────┐            ┌──────────┐
                                       │ Redis   │◀─租约TTL──▶│ 网关租约  │
                                       │ (Cluster│   device→  │ 管理(Lua) │
                                       └────┬────┘   gateway  └────┬─────┘
                                            │                     上报/续签
                                            │                             
                             指令下发/状态流 │                              事件消息
                                            ▼                              ▼
                                       ┌──────────┐                 ┌──────────┐
                                       │ Kafka    │◀───────────────▶│ 业务集群 │
                                       │  topics  │   (事件/状态)    │ (消费/存储│
                                       └────┬─────┘                 │ 策略/风控)│
                                            │                       └──────────┘
                                     ┌───────▼────────┐
                                     │ 监控&日志链路  │(Prometheus/Grafana/ELK/Jaeger)
                                     └────────────────┘

    端到端数据流

    1. 设备上线:设备→Nginx→某台网关(按一致哈希)。

    2. 握手&鉴权:网关校验签名/密钥→通过后在 Redis 写 device→gateway 映射并设置 TTL(租约)。

    3. 心跳保活:设备心跳;网关定期续签 Redis TTL。

    4. 上报数据:网关解码→校验→异步写 Kafka(at-least-once),关键字段落地(如设备在线态、最近电压等可走 Redis/时序库)。

    5. 指令下发:业务集群→查 device→gateway →RPC/消息投递到对应网关→Netty 写回设备。

    6. 故障切换:网关故障→租约不再续签到期→Nginx 一致哈希将新连接打到其他网关→新网关接管。

    关键模块

    • 接入层(Nginx Stream/L4 LB) :一致性哈希到网关;健康检查;连接数限流。

    • 连接管理(Netty) :epoll+直连堆外缓冲、连接表(ConcurrentHashMap)、空闲检测(IdleStateHandler)、背压(writeBufferWaterMark)。

    • 协议编解码:JT/T808 或自定义帧(LengthFieldBasedFrameDecoder/Protobuf/自定义 CRC 校验)。

    • 鉴权&会话:设备密钥/签名、时间戳;会话对象绑定 Channel,支持幂等上线。

    • 租约与映射(Redis) :HSET device→gateway + EXPIRE 或 SETEX;Lua 原子续约/踢同端。

    • 消息总线(Kafka) :上行事件/状态(topics 分区=一致性Key),下行指令回执。

    • 指令路由:先查 Redis 查到网关IP→RPC/HTTP 到该网关内置“下发接口”→Channel 定位并写。

    • 可观测性:Prometheus 指标(连接数、心跳延迟、解码失败、写队列积压、指令RT/成功率);日志脱敏;Trace(可选)。

    • 安全:黑名单、频率限制;签名+重放防护(nonce+时窗);数据加密(可选)。

    Redis 关键键设计(示例)

    # 设备→网关映射与租约(60s 心跳,TTL 180s)
    Key: dev:lease:{deviceId}  Value: {gatewayId}|{leaseVersion}|{ts}, TTL=180
    
    # 网关活跃集
    Key: gw:alive                 Set(gatewayId), 网关心跳维护
    
    # 设备在线态
    Key: dev:online:{deviceId}    0/1 + lastSeenTs

    用 Lua 保证:同一设备的“续约/接管/踢同端”原子化,且校验版本号避免并发覆盖。

    Nginx stream 一致性哈希(示例)

    stream {
      log_format basic '$remote_addr:$remote_port -> $server_addr:$server_port $status';
      access_log /var/log/nginx/stream_access.log basic;
    
      upstream gateway_pool {
        hash $remote_addr consistent;    # 可切换为 $ssl_preread_server_name 或 $proxy_protocol_addr
        server 10.0.0.11:9000 max_fails=3 fail_timeout=10s;
        server 10.0.0.12:9000 max_fails=3 fail_timeout=10s;
        server 10.0.0.13:9000 max_fails=3 fail_timeout=10s;
      }
    
      server {
        listen 7000;                     # 设备 TCP 入口
        proxy_connect_timeout 3s;
        proxy_timeout 3600s;
        proxy_pass gateway_pool;
      }
    }

    Netty Pipeline 建议

    ch.pipeline()
      .addLast("idle", new IdleStateHandler(90, 0, 0, TimeUnit.SECONDS))  // 读空闲=心跳丢失
      .addLast("frame", new LengthFieldBasedFrameDecoder(64 * 1024, 2, 2, 0, 0))
      .addLast("codec", new DeviceMessageCodec())        // 自定义编解码/CRC/签名
      .addLast("auth", new AuthHandler(redis, config))   // 首包鉴权, 通过后移除
      .addLast("hb", new HeartbeatHandler(redis))        // 续租、更新 lastSeen
      .addLast("biz", new UplinkHandler(kafkaProducer))  // 上行落 Kafka
      .addLast("ack", new DownlinkAckHandler(kafkaProducer)) // 回执
    ;

    Channel 选项

    bootstrap.option(ChannelOption.SO_BACKLOG, 4096)
             .childOption(ChannelOption.TCP_NODELAY, true)
             .childOption(ChannelOption.SO_REUSEADDR, true)
             .childOption(ChannelOption.SO_RCVBUF, 256*1024)
             .childOption(ChannelOption.SO_SNDBUF, 256*1024)
             .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
                 new WriteBufferWaterMark(8*1024, 32*1024));

    心跳 & 自愈(核心逻辑)

    • 设备心跳间隔:60s;网关读空闲>90s 视为掉线,关闭 Channel。

    • 网关续租:每 60s 执行 Lua:若 dev:lease:{id} 的 gatewayId 为本机且版本匹配→刷新 TTL;否则不续。

    • 网关宕机:不续租→TTL 到期(~180s)→映射消失→设备重连被 Nginx 分配到新网关→新网关用“接管 Lua”将 lease 版本+1 并写入。

    指令下发路径

    1. 业务调用:POST /cmd/send {deviceId, cmd, payload, timeout}

    2. 网关内部:根据 deviceId 查 Channel;若在线→写并等待 ACK;不在线→返回离线或入延时队列(可选)。

    3. 幂等:业务侧自带 requestId;ACK 携带同 ID;Kafka 侧对回执做去重。

    Kafka Topic 规划(示例)

    • device.uplink.raw(分区 key=deviceId):原始上报

    • device.status.event:上线/下线/心跳异常

    • device.cmd.ack:指令回执

    • device.metric:电压/温度/故障码等指标

    Lua 核心(伪代码)

    -- KEYS[1]=dev:lease:{deviceId}, ARGV={gatewayId, version, nowTs, ttl}
    local v = redis.call('GET', KEYS[1])
    if not v then
      redis.call('SET', KEYS[1], ARGV[1]..'|'..ARGV[2]..'|'..ARGV[3], 'EX', ARGV[4])
      return 'NEW'
    end
    local parts = {}
    for s in string.gmatch(v, "([^|]+)") do table.insert(parts, s) end
    local curGw = parts[1]; local curVer = tonumber(parts[2]) or 0
    if curGw == ARGV[1] and tonumber(ARGV[2]) == curVer then
      redis.call('SET', KEYS[1], curGw..'|'..curVer..'|'..ARGV[3], 'EX', ARGV[4])
      return 'RENEW'
    end
    -- 可加版本比较或踢同端策略
    return 'MISMATCH'

    JVM/内核/容量建议(单机 10~15万长连接基线)

    • JVM:JDK17;G1/ ZGC(内存≥32G可选 ZGC);-XX:MaxGCPauseMillis=100;堆 8~16G,直内存与堆外缓冲(PooledByteBufAllocator)≥堆;禁用偏向锁。

    • Netty-Dio.netty.allocator.type=pooled,开启 recycler.maxCapacity 合理回收。

    • 内核ulimit -n 1,000,000net.ipv4.tcp_tw_reuse=1somaxconn=65535tcp_max_syn_backlog=262144

    • 网卡:多队列中断、RPS/RFS;关闭大包分片导致的异常。

    • 压测:虚拟设备+连接增长/抖动/秒级断线重连;关注连接建立速率、写队列水位、99线RT。

    灰度与降级

    • 解码失败/异常比率阈值→自动降级(限流/丢弃高频非关键事件)。

    • 业务侧拥塞→下行指令限流/队列长度报警。

    • Redis/Kafka 抖动→本地缓存短暂兜底(Caffeine + TTL 5~15s)。

    扩容路线图(分阶段)

    • 阶段A(≤50万设备) :2~3 台网关 + Redis 主从或小集群 + 3 节点 Kafka;Nginx 单层。

    • 阶段B(≤500万) :网关水平扩展至 10+;Redis Cluster(69 节点);Kafka 57 节点,跨 AZ;NLB/SLB 健康检查。

    • 阶段C(≥2000万) :网关分区域池;跨区域 Redis/Kafka 双活(异步复制);Topic 分域;Trident/流计算侧做实时聚合降噪;自研 L4/五元组路由。

    最小可运行骨架(关键点)

    • 模块gateway-boot(Spring Boot)/ gateway-core(Netty)/ gateway-protocol(编解码)/ gateway-admin(运维面板)

    • 接口

      • POST /internal/cmd/send(下发)

      • GET /internal/channel/{deviceId}(查询在线/所在网关)

      • /actuator/prometheus(指标)

    1. 本地缓存(App 内存 & 手机存储)

    • 抖音 App 在你刷视频、看直播、浏览用户主页时,会在本地存储中缓存:

      • 视频封面、小视频片段(加快二次播放)

      • 你访问过的视频 ID / 位置

      • 预加载过的评论 / 互动数据

    • 这些数据通常保存在 App 的缓存目录中(安卓在 /Android/data/com.ss.android.ugc.aweme/ 下),
      清除缓存或卸载 App 就会删除。

    • 本地的 浏览历史 UI(“观看记录”)也是从服务器接口拉取,不是只存在本地。


    2. 服务器端记录(账号 / 设备绑定)

    • 抖音会在服务器端记录你账号下的浏览行为:

      • 观看的视频 ID 和时间

      • 停留时长、点赞/评论/分享等交互

      • 搜索词、关注记录、直播间观看记录

    • 即使你清除本地缓存或换设备,只要登录同一个账号,这些记录依然存在(用于个性化推荐)。

    • 未登录账号时,抖音也会基于 设备 ID、IP、浏览指纹 做一定的记录,主要用于推荐算法优化。


    3. 如何清除或减少记录

    • 在抖音 App 里我 → 右上角菜单 → 设置与隐私 → 浏览记录 可以关闭“观看记录”功能并清空已有记录。

    • 清除本地缓存设置与隐私 → 存储空间 → 清理缓存

    • 限制后台跟踪

      • 安卓可用“限制广告跟踪”或“隐私空间”

      • iOS 可在 设置 → 隐私 → 广告 → 限制广告跟踪 关闭跟踪

    • 如果需要完全不被记录,必须:

      • 不登录账号

      • 定期更换设备 ID / 清空 App 数据

      • 或通过代理 / 沙盒隔离运行 App(影响体验)