目录
🌟我的其他文章也讲解的比较有趣😁,如果喜欢博主的讲解方式,可以多多支持一下,感谢🤗!
🌟了解 Netty 请看 : 【Netty篇】幽默的讲解带你入门 Netty !建议收藏
其他优质专栏: 【🎇SpringBoot】【🎉多线程】【🎨Redis】【✨设计模式专栏(已完结)】…等
如果喜欢作者的讲解方式,可以点赞收藏加关注,你的支持就是我的动力
✨更多文章请看个人主页: 码熔burning
准备好迎接一场别开生面的流水线派对了吗?😄 今天我们要用幽默风趣的方式来聊聊 Netty 中的 Handler 和 Pipeline,它们在 Netty 中究竟扮演什么角色,又如何协同配合,把数据“打造成产品”。
了解 Netty 的 Channel 请看:【Netty篇】Channel 详解
了解 Netty 的 Future & Promise 请看:【Netty篇】Future & Promise 详解
一、 Handler & Pipeline——流水线上的“特种部队”与“生产线”
1、 ChannelHandler —— 流水线上的“特种兵”👮♂️
在 Netty 中,每个 ChannelHandler 就像一名训练有素的特种兵,负责处理 Channel 上的各种事件。它们可以分为两大阵营:
入站处理器(Inbound Handler)
通常继承自ChannelInboundHandlerAdapter
,专门负责处理从网络上“进厂”的原材料——比如读取客户端发送的数据、进行解码、执行业务逻辑,并在处理完毕后调用ctx.fireChannelRead(msg)
把数据往后传递。出站处理器(Outbound Handler)
通常继承自ChannelOutboundHandlerAdapter
,主要负责把数据“打包出厂”,对写回网络的数据进行编码、记录日志等处理。它们的调用顺序与入站处理器正好相反(逆序执行)。
简单来说,当数据在流水线上运动时,入站 Handler 就像一道道工序,在数据“进厂”时依次对它进行处理;而当数据需要“出厂”发往客户端时,则经过一系列出站 Handler 的“最后打扮”,保证成品美美哒地呈现!🍔
2、 ChannelPipeline —— 生产线上的“接力赛跑”🏃♀️🏃♂️
ChannelPipeline 则是把所有这些 Handler 组织在一起的数据处理链,它本质上就是一个双向链表,每个节点是一个包装了 ChannelHandler 的 ChannelHandlerContext。在这条流水线上:
- 入站数据 按添加顺序(从头到尾)依次经过每个 Handler,就像在接力赛中顺次把接力棒传给下一个队友;
- 出站数据 则会以相反的顺序(从尾到头)经过所有出站 Handler,就像在倒着跑的接力赛中,把数据从尾端“传回”前端进行最后处理。
光看的话比较单调,直接上代码,我们来详细解读代码和顺序吧!
二、 代码实例
1、 服务端代码示例
下面这段代码在服务端注册了 3 个入站 Handler 和 3 个出站 Handler。我们来看一下每个 Handler 的作用和执行流程。
public class NetServer {
public static void main(String[] args) {
// 配置服务器引导类,用于启动和绑定网络服务器
new ServerBootstrap()
// 设置事件循环组,处理I/O操作
.group(new NioEventLoopGroup())
// 指定通道类型为NIO服务器Socket通道
.channel(NioServerSocketChannel.class)
// 设置通道初始化器,用于配置管道处理逻辑
.childHandler(new ChannelInitializer<NioSocketChannel>() {
/**
* 初始化通道,设置通道的管道处理器
* @param ch 要初始化的通道
*/
protected void initChannel(NioSocketChannel ch) {
// 添加入站处理器,处理接收到的消息
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("这是第 1 道入站工序");
// 将消息传递给管道中的下一个入站处理器
ctx.fireChannelRead(msg); // 1
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("这是第 2 道入站工序");
// 将消息传递给管道中的下一个入站处理器
ctx.fireChannelRead(msg); // 2
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("这是第 3 道入站工序");
// 将消息写入管道,准备发送给客户端
ctx.channel().write(msg); // 3
}
});
// 添加出站处理器,处理待发送的消息
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println("这是第 1 道出站工序");
// 继续处理写操作,将消息写入管道
ctx.write(msg, promise); // 4
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println("这是第 2 道出站工序");
// 继续处理写操作,将消息写入管道
ctx.write(msg, promise); // 5
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println("这是第 3 道出站工序");
// 继续处理写操作,将消息写入管道
ctx.write(msg, promise); // 6
}
});
}
})
// 绑定端口8080,启动服务器
.bind(8080);
}
}
2、 客户端代码示例
public class NetClient {
public static void main(String[] args) {
// 配置客户端
new Bootstrap()
// 指定事件循环组,处理I/O操作
.group(new NioEventLoopGroup())
// 指定通道类型
.channel(NioSocketChannel.class)
// 设置通道初始化器,配置管道处理器
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
// 添加字符串编码器到管道,以便发送字符串消息
ch.pipeline().addLast(new StringEncoder());
}
})
// 连接到指定的主机和端口
.connect("127.0.0.1", 8080)
// 添加监听器,在连接成功后执行操作
.addListener((ChannelFutureListener) future -> {
// 发送消息到服务器
future.channel().writeAndFlush("hello,world");
});
}
}
分别运行服务端和客户端,查看运行结果
3、 执行顺序详解
入站处理器(Inbound)
数据刚刚从客户端传入时,会依次触发三个入站 Handler:- 第一个 Handler 打印
这是第 1 道入站工序
,调用ctx.fireChannelRead(msg)
传给下一个 Handler。 - 第二个 Handler 打印
这是第 2 道入站工序
,同样调用ctx.fireChannelRead(msg)
。 - 第三个 Handler打印
这是第 3 道入站工序
,然后调用ctx.channel().write(msg)
触发出站处理器。
- 第一个 Handler 打印
出站处理器(Outbound)
当第三个入站 Handler 调用ctx.channel().write(msg)
时,它开始从 Pipeline 尾部查找出站处理器,并依次执行:- 最后一个出站 Handler(Handler6)打印
这是第 3 道出站工序
,调用ctx.write(msg, promise)
将消息交给前一个出站 Handler; - 然后 Handler5 打印
这是第 2 道出站工序
; - 接着 Handler4 打印
这是第 1 道出站工序
。
完成后消息会真正被写出到 Channel 中。这里的关键是:出站 Handler 的执行顺序与它们添加的顺序相反,也就是从尾部开始触发。
- 最后一个出站 Handler(Handler6)打印
输出流程图
这就完美地展现了入站与出站的“正序与逆序”运行机制。😎
补充说明:ctx.channel().write(msg) vs ctx.write(msg)
ctx.channel().write(msg)
从整个 Pipeline 的尾部开始查找出站处理器,这意味着总是会从最新添加的出站 Handler 开始执行。ctx.write(msg)
则从当前 Handler 的上一个出站 Handler 开始触发,如果在当前节点之前没有出站 Handler,则不会触发出站逻辑。
例如,在第三个入站 Handler中使用 ctx.channel().write(msg)
能确保出站 Handler 全部触发;如果改成 ctx.write(msg)
,而节点3自身后没有出站处理器,则可能只触发部分或不触发任何出站 Handler。
顺序模拟
上面的大致执行顺序你应该了解了,接下来实战一下看看具体效果是否真的是符合我们的想法。我已经在上面服务端代码标记好了需要操作的代码行,标记了 1、2、3、4、5、6,你需要先找到:
首先我将 1 处的代码注释掉,查看运行结果:
可以看到只打印了第一道工序,后面的都没有打印,接下来我把 1 处的代码注释回来,然后注释 2 处的代码,查看运行结果:
由输出结果咱可以得出结论:如果要调用下一个入站处理器,就必须使用 ctx.fireChannelRead(msg)
向下传递,出站也是同样的,你们可以自行的去试一下,比如注释掉 5
或者 6
来看一下输出结果。
注意 : 你如果将
3
处的ctx.channel().write(msg)
误写成了ctx.write(msg)
,仅会打印 如下结果,因为节点3
之前没有其它出站处理器了
6
处的ctx.write(msg, promise)
误写成了ctx.channel().write(msg)
会打印如下结果, 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点6
自己
三、 总结:特种兵与流水线的完美协作🎉
- ChannelHandler 就像车间里的各路特种兵,不同的 Handler 负责不同的加工任务——有的负责“进料”(入站),有的负责“出货”(出站)。
- ChannelPipeline 则是连接这些特种兵的生产线,就像流水线一样确保数据能够逐个被加工、转换后顺利送出。
- 执行顺序
- 入站:按照 Handler 添加的先后顺序依次处理(正序)。
- 出站:按照 Handler 添加的逆序处理(倒序)。
这一切就好像是一场精心设计的接力赛,数据作为接力棒在各个 Handler 之间传递、加工,最终将高质量的产品“送达”客户端!🍔🚀
希望这份幽默而详细的讲解能让你对 Netty 的 Handler 和 Pipeline 有全新的认识。😄