以下是一个完整的Netty + Protobuf实际案例,包含服务端和客户端实现:
步骤1:定义Protobuf协议
创建message.proto
文件:
syntax = "proto3";
option java_package = "com.example.netty.protobuf";
option java_outer_classname = "MessageProto";
message Message {
int32 id = 1;
string content = 2;
MessageType type = 3;
enum MessageType {
REQUEST = 0;
RESPONSE = 1;
}
}
生成Java类:
protoc --java_out=./src/main/java message.proto
步骤2:服务端实现
// Server.java
public class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 解码器
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MessageProto.Message.getDefaultInstance()));
// 编码器
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
// 业务处理器
pipeline.addLast(new ServerHandler());
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
// ServerHandler.java
public class ServerHandler extends SimpleChannelInboundHandler<MessageProto.Message> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProto.Message msg) {
System.out.println("Server received: " + msg.getContent());
// 构建响应
MessageProto.Message response = MessageProto.Message.newBuilder()
.setId(msg.getId())
.setContent("Response to: " + msg.getContent())
.setType(MessageProto.Message.MessageType.RESPONSE)
.build();
ctx.writeAndFlush(response);
}
}
步骤3:客户端实现
// Client.java
public class Client {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MessageProto.Message.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new ClientHandler());
}
});
ChannelFuture f = b.connect("localhost", 8888).sync();
// 发送消息
MessageProto.Message request = MessageProto.Message.newBuilder()
.setId(1)
.setContent("Hello Server")
.setType(MessageProto.Message.MessageType.REQUEST)
.build();
f.channel().writeAndFlush(request);
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
// ClientHandler.java
public class ClientHandler extends SimpleChannelInboundHandler<MessageProto.Message> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProto.Message msg) {
System.out.println("Client received: " + msg.getContent());
}
}
关键点说明
- 编解码器链配置:
-
ProtobufVarint32FrameDecoder
:处理TCP粘包/拆包ProtobufDecoder
:将字节流转换为Protobuf对象ProtobufVarint32LengthFieldPrepender
:添加长度前缀ProtobufEncoder
:将Protobuf对象序列化为字节流
- 消息流转:
客户端发送: Protobuf对象 → 编码 → 字节流 → 网络传输
服务端接收: 字节流 → 解码 → Protobuf对象 → 业务处理
- 性能优化:
-
- 复用Protobuf对象实例(
getDefaultInstance()
) - 使用
Unpooled
直接内存分配(示例中Netty已自动优化)
- 复用Protobuf对象实例(
运行测试
- 先启动服务端
- 再启动客户端
- 观察控制台输出:
Server received: Hello Server
Client received: Response to: Hello Server
扩展建议
- 添加SSL加密:
pipeline.addFirst(new SslHandler(sslEngine));
- 使用
LengthFieldBasedFrameDecoder
替代Protobuf自带拆包器 - 结合
ByteToMessageCodec
实现复合消息处理
这个示例完整展示了Netty与Protobuf的整合,实现了二进制数据的高效传输。实际生产环境中可根据需要添加心跳机制、重连策略等扩展功能。