使用 Netty 实现 TCP 私有协议(解决粘包/拆包)

发布于:2025-06-26 ⋅ 阅读:(17) ⋅ 点赞:(0)

        Netty 是一个高性能、异步、事件驱动的网络框架,非常适合用于构建 TCP 通信中的私有协议。相比原生 Java Socket,Netty 提供了更简洁、更高效的粘包/拆包处理机制,下面案例通过使用 LengthFieldBasedFrameDecoder 自动完成数据包的解析。

        例如,我们采用的私有协议格式如下:

[消息长度(4字节)][消息内容]

代码示例:

        1、服务端代码

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class Server {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<Channel>() {
                        @Override
                        protected void initChannel(Channel ch) {
                            ChannelPipeline pipeline = ch.pipeline();

                            // 自动处理粘包/拆包
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(
                                    Integer.MAX_VALUE, 0, 4, 0, 4));

                            // 解码和编码字符串
                            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));

                            // 业务逻辑处理
                            pipeline.addLast(new ServerHandler());
                        }
                    });

            ChannelFuture future = bootstrap.bind(8888).sync();
            System.out.println("Server started on port 8888...");
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    static class ServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            // 此处进行业务处理,消息解密、鉴权、路由转发处理等等。。。
            String request = (String) msg;
            System.out.println("Received: " + request);

            // 回应客户端
            String response = "Echo: " + request;
            ctx.writeAndFlush(response);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }
}

        2、客户端代码

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.nio.charset.StandardCharsets;

public class Client {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<Channel>() {
                        @Override
                        protected void initChannel(Channel ch) {
                            ChannelPipeline pipeline = ch.pipeline();

                            // 添加编码器:自动在消息前添加4字节长度
                            pipeline.addLast(new LengthFieldPrepender(4));

                            // 解码和编码字符串
                            pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8));
                            pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8));

                            // 业务逻辑处理
                            pipeline.addLast(new ClientHandler());
                        }
                    });

            ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
            System.out.println("Connected to server...");

            // 发送多条消息模拟粘包/拆包
            String[] messages = {"Hello", "Hi", "Bye", "End"};
            for (String msg : messages) {
                future.channel().writeAndFlush(msg);
                Thread.sleep(100); // 模拟发送间隔
            }

            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    static class ClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            System.out.println("Receive Server Response: " + msg);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }
}

关键说明:

1、服务端粘包/拆包处理

Netty 的 LengthFieldBasedFrameDecoder 会自动处理以下情况:

        粘包:多个消息被合并接收 -> 按照长度字段正确拆分为多个消息。

        拆包:一个消息被分多次接收 -> 缓存未处理数据,直到读取完整消息。

new LengthFieldBasedFrameDecoder(
    Integer.MAX_VALUE,   // 最大帧长度
    0,                   // 长度字段偏移量
    4,                   // 长度字段占用字节数
    0,                   // 长度字段之后的偏移量
    4                    // 调整后的偏移量(跳过长度字段)
)

2、客户端发送消息长度处理

LengthFieldPrepender(4) 的作用:

        1)自动在每条消息前插入 4 字节的长度字段。

        2)长度字段表示的是消息内容的字节数,不包括长度字段本身。

假设客户端发送消息 "Hello"(5 字节):

        使用 LengthFieldPrepender(4) 后,Netty 会自动将消息封装为:

[0x00 0x00 0x00 0x05] + [Hello]

前 4 字节是长度字段(5 字节),后 5 字节是消息内容,总共发送 9 字节。

3、为什么选择 4 字节?

4 字节可以表示最大长度为 2^32 - 1(4294967295 字节),对于大多数场景来说足够。如果消息长度超过 4 字节能表示的范围,可以调整为 8 字节(LengthFieldPrepender(8))。


网站公告

今日签到

点亮在社区的每一天
去签到