【Netty系列】Protobuf编码解码:客户端、服务端

发布于:2025-06-02 ⋅ 阅读:(28) ⋅ 点赞:(0)

以下是一个完整的Netty + Protobuf实际案例,包含服务端和客户端实现:


步骤1:定义Protobuf协议

创建message.proto文件:

syntax = "proto3";

option java_package = "com.example.netty.protobuf";
option java_outer_classname = "MessageProto";

message Message {
  int32 id = 1;
  string content = 2;
  MessageType type = 3;

  enum MessageType {
    REQUEST = 0;
    RESPONSE = 1;
  }
}

生成Java类:

protoc --java_out=./src/main/java message.proto

步骤2:服务端实现

// Server.java
public class Server {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
      
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) {
                     ChannelPipeline pipeline = ch.pipeline();
                     // 解码器
                     pipeline.addLast(new ProtobufVarint32FrameDecoder());
                     pipeline.addLast(new ProtobufDecoder(MessageProto.Message.getDefaultInstance()));
                     // 编码器
                     pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
                     pipeline.addLast(new ProtobufEncoder());
                     // 业务处理器
                     pipeline.addLast(new ServerHandler());
                 }
             });

            ChannelFuture f = b.bind(8888).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

// ServerHandler.java
public class ServerHandler extends SimpleChannelInboundHandler<MessageProto.Message> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProto.Message msg) {
        System.out.println("Server received: " + msg.getContent());
      
        // 构建响应
        MessageProto.Message response = MessageProto.Message.newBuilder()
                .setId(msg.getId())
                .setContent("Response to: " + msg.getContent())
                .setType(MessageProto.Message.MessageType.RESPONSE)
                .build();
      
        ctx.writeAndFlush(response);
    }
}

步骤3:客户端实现

// Client.java
public class Client {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
      
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) {
                     ChannelPipeline pipeline = ch.pipeline();
                     pipeline.addLast(new ProtobufVarint32FrameDecoder());
                     pipeline.addLast(new ProtobufDecoder(MessageProto.Message.getDefaultInstance()));
                     pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
                     pipeline.addLast(new ProtobufEncoder());
                     pipeline.addLast(new ClientHandler());
                 }
             });

            ChannelFuture f = b.connect("localhost", 8888).sync();
          
            // 发送消息
            MessageProto.Message request = MessageProto.Message.newBuilder()
                    .setId(1)
                    .setContent("Hello Server")
                    .setType(MessageProto.Message.MessageType.REQUEST)
                    .build();
          
            f.channel().writeAndFlush(request);
          
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

// ClientHandler.java
public class ClientHandler extends SimpleChannelInboundHandler<MessageProto.Message> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProto.Message msg) {
        System.out.println("Client received: " + msg.getContent());
    }
}

关键点说明

  1. 编解码器链配置
    • ProtobufVarint32FrameDecoder:处理TCP粘包/拆包
    • ProtobufDecoder:将字节流转换为Protobuf对象
    • ProtobufVarint32LengthFieldPrepender:添加长度前缀
    • ProtobufEncoder:将Protobuf对象序列化为字节流
  1. 消息流转
客户端发送: Protobuf对象 → 编码 → 字节流 → 网络传输
服务端接收: 字节流 → 解码 → Protobuf对象 → 业务处理
  1. 性能优化
    • 复用Protobuf对象实例(getDefaultInstance()
    • 使用Unpooled直接内存分配(示例中Netty已自动优化)

运行测试

  1. 先启动服务端
  2. 再启动客户端
  3. 观察控制台输出:
Server received: Hello Server
Client received: Response to: Hello Server

扩展建议

  1. 添加SSL加密:
pipeline.addFirst(new SslHandler(sslEngine));
  1. 使用LengthFieldBasedFrameDecoder替代Protobuf自带拆包器
  2. 结合ByteToMessageCodec实现复合消息处理

这个示例完整展示了Netty与Protobuf的整合,实现了二进制数据的高效传输。实际生产环境中可根据需要添加心跳机制、重连策略等扩展功能。


网站公告

今日签到

点亮在社区的每一天
去签到