分布式微服务系统架构第160集:百万台设备Netty网关架构

发布于:2025-08-10 ⋅ 阅读:(13) ⋅ 点赞:(0)

加群联系作者vx:xiaoda0423

仓库地址:https://webvueblog.github.io/JavaPlusDoc/

https://1024bat.cn/

https://github.com/webVueBlog/fastapi_plus

https://webvueblog.github.io/JavaPlusDoc/

点击勘误issues,哪吒感谢大家的阅读

bootstrap = new ServerBootstrap()

创建 ServerBootstrap 实例,是 Netty 服务端启动的核心辅助类,用于配置服务器端的各项参数。

.group(bossGroup, workerGroup)

设置 主从线程组(EventLoopGroup) :

  • bossGroup:负责接收客户端连接请求;

  • workerGroup:负责处理客户端的 I/O 读写操作。

.channel(NioServerSocketChannel.class)

设置服务器通道类型:

  • 使用基于 NIO 的 NioServerSocketChannel,这是 Netty 为服务器端提供的异步非阻塞通道。

.option(ChannelOption.SO_BACKLOG, 1024)

设置服务器端套接字的 连接队列大小

  • 对应 Java 原生 ServerSocketChannel 的 backlog 参数;

  • 表示连接请求的等待队列大小(尚未被 accept 的连接数)最大为 1024。

.option(ChannelOption.SO_REUSEADDR, true)

允许地址复用:

  • 服务器重启时,允许重新绑定处于 TIME_WAIT 状态的端口;

  • 对于高并发环境可提高端口复用效率。

.childOption(ChannelOption.TCP_NODELAY, true)

关闭 Nagle 算法,开启小包即时发送:

  • true 表示禁用 Nagle 算法,提升低延迟通信效率;

  • 适用于实时性要求较高的应用场景(如物联网设备)。

.childOption(ChannelOption.SO_KEEPALIVE, true)

开启 TCP 心跳机制(保活):

  • 用于检测长时间空闲连接是否存活;

  • 防止客户端突然掉线服务端长期不知。

.childHandler(new IoTChannelInitializer());

设置子通道的初始化器:

  • 每当有新客户端连接时,IoTChannelInitializer 会为这个连接初始化 pipeline(添加编码器、解码器、业务处理器等);

  • 是处理每个 SocketChannel 的关键入口。


@Component
public class ConnectionManager {

声明一个 Spring Bean:

  • 使用 @Component 注解表示该类由 Spring 容器自动管理;

  • 常用于统一管理设备连接。


private final ConcurrentHashMap<String, Channel> deviceChannels = new ConcurrentHashMap<>();

使用线程安全的 ConcurrentHashMap 存储设备连接:

  • key: 设备唯一标识(如 deviceId);

  • value: 对应的 Netty Channel,表示该设备的连接通道;

  • 便于实现设备连接状态的快速查找、管理与转发。


private final AtomicLong connectionCount = new AtomicLong(0);

使用 AtomicLong 统计当前连接数量,支持高并发下的线程安全自增/自减操作。


public void addConnection(String deviceId, Channel channel) {
        deviceChannels.put(deviceId, channel);
        connectionCount.incrementAndGet();

添加新连接:

  • 将设备与其连接的 Channel 建立映射;

  • 连接计数加 1;


channel.pipeline().addLast(new IdleStateHandler(60, 30, 0));

为当前连接添加 心跳检测处理器

  • readerIdleTime = 60 秒:60 秒内未读取数据则触发 READER_IDLE

  • writerIdleTime = 30 秒:30 秒内未写入数据则触发 WRITER_IDLE

  • allIdleTime = 0:关闭 ALL_IDLE

  • 用于检测连接空闲状态,避免死连接或长时间不通信的设备;


public void removeConnection(String deviceId) {
        Channel channel = deviceChannels.remove(deviceId);

移除连接:

  • 从 deviceChannels 映射中移除该设备对应的 Channel;

  • 如果存在该连接则执行以下操作:


if (channel != null) {
            connectionCount.decrementAndGet();
            channel.close();
        }
  • 连接计数减 1;

  • 主动关闭对应的 Channel(断开连接);


public long getConnectionCount() {
        return connectionCount.get();
    }

获取当前连接数(用于监控系统负载、统计连接活跃度等);


1. 域名解析

iot-gateway.example.com
├── 192.168.1.10 (网关集群1)
├── 192.168.1.11 (网关集群2)
└── 192.168.1.12 (网关集群3)
  • 目的:让同一个域名指向多台网关服务器(每台运行 Netty 网关服务)。

  • 好处:方便客户端只用一个域名连接,Nginx 负责流量分发。

  • 这里的 192.168.1.x 是局域网 IP,生产环境可以替换为公网 IP 或云服务器内网 IP。


2. Nginx upstream 配置

upstream iot_gateway {
    # 一致性哈希,保证同一客户端IP始终分配到同一台网关
    hash $remote_addr consistent;
    
    server 192.168.1.10:8080 weight=1 max_fails=3 fail_timeout=30s;
    server 192.168.1.11:8080 weight=1 max_fails=3 fail_timeout=30s;
    server 192.168.1.12:8080 weight=1 max_fails=3 fail_timeout=30s;
}

关键点:

  • hash $remote_addr consistent;
    使用一致性哈希算法,根据 客户端 IP 分配服务器:

    • 保证同一个设备(相同 IP)总是连接到同一台网关;

    • 适合 长连接场景(IoT、WebSocket、Netty TCP);

    • consistent 关键字减少节点变动时的迁移影响。

  • server 192.168.1.x:8080
    每台网关服务监听 8080 端口。

  • weight=1
    每台服务器权重相等,均衡分配。

  • max_fails=3 fail_timeout=30s

    • 如果某个节点在 30 秒内失败 3 次,则临时剔除 30 秒;

    • 避免设备频繁连接到故障节点。


3. Nginx server 块

server {
    listen 80;
    location / {
        proxy_pass http://iot_gateway;
        proxy_connect_timeout 5s;
        proxy_timeout 60s;
    }
}

关键点:

  • listen 80;
    接收客户端 HTTP 请求(如果是 TCP/UDP 则需 stream 模块)。

  • proxy_pass http://iot_gateway;
    将流量转发给上面定义的 upstream 集群。

  • proxy_connect_timeout 5s;
    连接上游服务器超时时间,5 秒没连上就失败。

  • proxy_timeout 60s;
    与上游服务器保持连接的最大空闲时间。


4. ⚠ 注意:Netty TCP 场景下的 Nginx 配置

上面是 HTTP 代理模式,如果你的 IoT 网关是 TCP(非 HTTP) ,Nginx 要用 stream 块:

stream {
    upstream iot_gateway {
        hash$remote_addr consistent;
        server 192.168.1.10:8080;
        server 192.168.1.11:8080;
        server 192.168.1.12:8080;
    }

    server {
        listen 9000; # 客户端连接的 TCP 端口
        proxy_connect_timeout 5s;
        proxy_timeout 600s; # 长连接要设置大一些
        proxy_pass iot_gateway;
    }
}

这样才能转发原生 TCP 数据(Netty、JT808、MQTT 等)。


5. 整体流程图

设备(TCP连接)
     │  iot-gateway.example.com:9000
     ▼
  Nginx (一致性哈希)
     ├── 192.168.1.10:8080 → Netty 网关实例 1
     ├── 192.168.1.11:8080 → Netty 网关实例 2
     └── 192.168.1.12:8080 → Netty 网关实例 3

同一设备 → 固定网关 → 减少重连和会话迁移。

一、分布式 IoT 网关架构图

┌─────────────────────────┐
                │      IoT 设备 (TCP)      │
                └──────────────┬──────────┘
                               │ 域名: iot-gateway.example.com:9000
                               ▼
                  ┌─────────────────────────┐
                  │      Nginx (stream)      │
                  │ 一致性哈希 hash$remote_addr │
                  └──────────────┬──────────┘
                     ┌───────────┼───────────┐
                     ▼           ▼           ▼
       ┌────────────────┐  ┌────────────────┐  ┌────────────────┐
       │  Netty 网关 #1 │  │  Netty 网关 #2 │  │  Netty 网关 #3 │
       │ 192.168.1.10   │  │ 192.168.1.11   │  │ 192.168.1.12   │
       └───────┬────────┘  └───────┬────────┘  └───────┬────────┘
               │                   │                   │
               ▼                   ▼                   ▼
      ┌────────────────┐   ┌────────────────┐   ┌────────────────┐
      │ Redis 集群      │←→│ deviceId→网关映射│←→│ 宕机重分配逻辑 │
      └────────────────┘   └────────────────┘   └────────────────┘
               │
               ▼
      ┌────────────────┐
      │ Kafka 消息队列 │ ←→ 后端业务处理服务
      └────────────────┘

二、Nginx stream 配置(TCP 一致性哈希)

stream {
    upstream iot_gateway {
        hash$remote_addr consistent;
        server 192.168.1.10:9000 max_fails=3 fail_timeout=30s;
        server 192.168.1.11:9000 max_fails=3 fail_timeout=30s;
        server 192.168.1.12:9000 max_fails=3 fail_timeout=30s;
    }

    server {
        listen 9000; # 对外 TCP 接口
        proxy_connect_timeout 5s;
        proxy_timeout 600s;  # 长连接
        proxy_pass iot_gateway;
    }
}
  • hash $remote_addr consistent; 保证相同设备 IP 落到同一网关。

  • stream 模式支持 TCP 转发(Netty、MQTT、JT808)。

  • proxy_timeout 设置大一些,保证长连接不被 Nginx 中断。


三、Netty 网关 + Redis + Kafka 代码示例

1. ConnectionManager.java

@Component
public class ConnectionManager {
    private final ConcurrentHashMap<String, Channel> deviceChannels = new ConcurrentHashMap<>();
    private final StringRedisTemplate redisTemplate;

    @Autowired
    public ConnectionManager(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public void addConnection(String deviceId, Channel channel, String gatewayIp) {
        deviceChannels.put(deviceId, channel);
        // 写入 Redis,TTL 设置为 2 分钟心跳自动续期
        redisTemplate.opsForValue().set("device:gateway:" + deviceId, gatewayIp, 2, TimeUnit.MINUTES);
        // 添加心跳检测
        channel.pipeline().addLast(new IdleStateHandler(60, 30, 0));
    }

    public void removeConnection(String deviceId) {
        Channel ch = deviceChannels.remove(deviceId);
        if (ch != null) {
            redisTemplate.delete("device:gateway:" + deviceId);
            ch.close();
        }
    }

    public Optional<String> getGatewayForDevice(String deviceId) {
        return Optional.ofNullable(redisTemplate.opsForValue().get("device:gateway:" + deviceId));
    }
}

2. Netty ChannelInitializer

public class IoTChannelInitializer extends ChannelInitializer<SocketChannel> {
    private final ConnectionManager connectionManager;
    private final String gatewayIp;

    public IoTChannelInitializer(ConnectionManager cm, String gatewayIp) {
        this.connectionManager = cm;
        this.gatewayIp = gatewayIp;
    }

    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();
        p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
        p.addLast(new LengthFieldPrepender(2));
        p.addLast(new StringDecoder(StandardCharsets.UTF_8));
        p.addLast(new StringEncoder(StandardCharsets.UTF_8));
        p.addLast(new SimpleChannelInboundHandler<String>() {
            @Override
            public void channelActive(ChannelHandlerContext ctx) {
                // 假设第一条消息是 deviceId
                ctx.channel().attr(AttributeKey.valueOf("deviceId")).set("UNKNOWN");
            }

            @Override
            protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                // 设备首次上报 ID
                if (ctx.channel().attr(AttributeKey.valueOf("deviceId")).get().equals("UNKNOWN")) {
                    String deviceId = msg.trim();
                    ctx.channel().attr(AttributeKey.valueOf("deviceId")).set(deviceId);
                    connectionManager.addConnection(deviceId, ctx.channel(), gatewayIp);
                    System.out.println("设备上线:" + deviceId + " → " + gatewayIp);
                } else {
                    // 业务数据转 Kafka
                    kafkaTemplate.send("iot.data", msg);
                }
            }

            @Override
            public void channelInactive(ChannelHandlerContext ctx) {
                String deviceId = (String) ctx.channel().attr(AttributeKey.valueOf("deviceId")).get();
                connectionManager.removeConnection(deviceId);
                System.out.println("设备下线:" + deviceId);
            }
        });
    }
}

四、Redis 数据示例

key

value

TTL

device:gateway:ABC123456

192.168.1.10

120 s

device:gateway:XYZ987654

192.168.1.11

120 s

device:gateway:LMN654321

192.168.1.12

120 s

  • 设备上线时记录当前连接的网关 IP;

  • 网关每次收到心跳包时续期 TTL;

  • 网关宕机时,该设备的 Redis Key 不会续期,自动过期后,Nginx 会将连接路由到新的可用网关。


五、Kafka 消费示例(后端业务服务)

@KafkaListener(topics = "iot.data", groupId = "iot-processor")
public void processIoTMessage(String message) {
    // 处理设备上报的数据
    System.out.println("收到 IoT 数据: " + message);
}

六、宕机平滑迁移流程

  1. 设备与网关保持心跳(TTL 续期)。

  2. 网关宕机 → 心跳停止 → Redis key 过期。

  3. Nginx 检测到连接断开 → 一致性哈希重新路由到新网关。

  4. 新网关接管设备连接 → 写入新的 deviceId → gatewayIP 映射。

  5. Kafka 消息处理不受影响(数据消费是分布式的)。


网站公告

今日签到

点亮在社区的每一天
去签到