【Netty篇】Handler & Pipeline 详解

发布于:2025-04-18 ⋅ 阅读:(43) ⋅ 点赞:(0)

在这里插入图片描述

🌟我的其他文章也讲解的比较有趣😁,如果喜欢博主的讲解方式,可以多多支持一下,感谢🤗!

🌟了解 Netty 请看 : 【Netty篇】幽默的讲解带你入门 Netty !建议收藏

其他优质专栏: 【🎇SpringBoot】【🎉多线程】【🎨Redis】【✨设计模式专栏已完结)】…等

如果喜欢作者的讲解方式,可以点赞收藏加关注,你的支持就是我的动力
✨更多文章请看个人主页: 码熔burning

准备好迎接一场别开生面的流水线派对了吗?😄 今天我们要用幽默风趣的方式来聊聊 Netty 中的 Handler 和 Pipeline,它们在 Netty 中究竟扮演什么角色,又如何协同配合,把数据“打造成产品”。


了解 Netty 的 Channel 请看:【Netty篇】Channel 详解
了解 Netty 的 Future & Promise 请看:【Netty篇】Future & Promise 详解


一、 Handler & Pipeline——流水线上的“特种部队”与“生产线”

1、 ChannelHandler —— 流水线上的“特种兵”👮‍♂️

在 Netty 中,每个 ChannelHandler 就像一名训练有素的特种兵,负责处理 Channel 上的各种事件。它们可以分为两大阵营:

  • 入站处理器(Inbound Handler)
    通常继承自 ChannelInboundHandlerAdapter,专门负责处理从网络上“进厂”的原材料——比如读取客户端发送的数据、进行解码、执行业务逻辑,并在处理完毕后调用 ctx.fireChannelRead(msg) 把数据往后传递。

  • 出站处理器(Outbound Handler)
    通常继承自 ChannelOutboundHandlerAdapter,主要负责把数据“打包出厂”,对写回网络的数据进行编码、记录日志等处理。它们的调用顺序与入站处理器正好相反(逆序执行)。

简单来说,当数据在流水线上运动时,入站 Handler 就像一道道工序,在数据“进厂”时依次对它进行处理;而当数据需要“出厂”发往客户端时,则经过一系列出站 Handler 的“最后打扮”,保证成品美美哒地呈现!🍔

2、 ChannelPipeline —— 生产线上的“接力赛跑”🏃‍♀️🏃‍♂️

ChannelPipeline 则是把所有这些 Handler 组织在一起的数据处理链,它本质上就是一个双向链表,每个节点是一个包装了 ChannelHandler 的 ChannelHandlerContext。在这条流水线上:

  • 入站数据 按添加顺序(从头到尾)依次经过每个 Handler,就像在接力赛中顺次把接力棒传给下一个队友;
  • 出站数据 则会以相反的顺序(从尾到头)经过所有出站 Handler,就像在倒着跑的接力赛中,把数据从尾端“传回”前端进行最后处理。

光看的话比较单调,直接上代码,我们来详细解读代码和顺序吧!


二、 代码实例

1、 服务端代码示例

下面这段代码在服务端注册了 3 个入站 Handler 和 3 个出站 Handler。我们来看一下每个 Handler 的作用和执行流程。

public class NetServer {
    public static void main(String[] args) {
        // 配置服务器引导类,用于启动和绑定网络服务器
        new ServerBootstrap()
                // 设置事件循环组,处理I/O操作
                .group(new NioEventLoopGroup())
                // 指定通道类型为NIO服务器Socket通道
                .channel(NioServerSocketChannel.class)
                // 设置通道初始化器,用于配置管道处理逻辑
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    /**
                     * 初始化通道,设置通道的管道处理器
                     * @param ch 要初始化的通道
                     */
                    protected void initChannel(NioSocketChannel ch) {
                        // 添加入站处理器,处理接收到的消息
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                System.out.println("这是第 1 道入站工序");
                                // 将消息传递给管道中的下一个入站处理器
                                ctx.fireChannelRead(msg);     // 1
                            }
                        });
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                System.out.println("这是第 2 道入站工序");
                                // 将消息传递给管道中的下一个入站处理器
                                ctx.fireChannelRead(msg);     // 2
                            }
                        });
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                System.out.println("这是第 3 道入站工序");
                                // 将消息写入管道,准备发送给客户端
                                ctx.channel().write(msg);     // 3
                            }
                        });
                        // 添加出站处理器,处理待发送的消息
                        ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg,
                                              ChannelPromise promise) {
                                System.out.println("这是第 1 道出站工序");
                                // 继续处理写操作,将消息写入管道
                                ctx.write(msg, promise);      // 4
                            }
                        });
                        ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg,
                                              ChannelPromise promise) {
                                System.out.println("这是第 2 道出站工序");
                                // 继续处理写操作,将消息写入管道
                                ctx.write(msg, promise);     // 5
                            }
                        });
                        ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg,
                                              ChannelPromise promise) {
                                System.out.println("这是第 3 道出站工序");
                                // 继续处理写操作,将消息写入管道
                                ctx.write(msg, promise);       // 6
                                
                            }
                        });
                    }
                })
                // 绑定端口8080,启动服务器
                .bind(8080);
    }
}

2、 客户端代码示例

public class NetClient {
    public static void main(String[] args) {
        // 配置客户端
        new Bootstrap()
                // 指定事件循环组,处理I/O操作
                .group(new NioEventLoopGroup())
                // 指定通道类型
                .channel(NioSocketChannel.class)
                // 设置通道初始化器,配置管道处理器
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) {
                        // 添加字符串编码器到管道,以便发送字符串消息
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                // 连接到指定的主机和端口
                .connect("127.0.0.1", 8080)
                // 添加监听器,在连接成功后执行操作
                .addListener((ChannelFutureListener) future -> {
                    // 发送消息到服务器
                    future.channel().writeAndFlush("hello,world");
                });
    }
}

分别运行服务端和客户端,查看运行结果

在这里插入图片描述

3、 执行顺序详解

  1. 入站处理器(Inbound)
    数据刚刚从客户端传入时,会依次触发三个入站 Handler:

    • 第一个 Handler 打印 这是第 1 道入站工序,调用 ctx.fireChannelRead(msg) 传给下一个 Handler。
    • 第二个 Handler 打印 这是第 2 道入站工序,同样调用 ctx.fireChannelRead(msg)
    • 第三个 Handler打印 这是第 3 道入站工序,然后调用 ctx.channel().write(msg) 触发出站处理器。
  2. 出站处理器(Outbound)
    当第三个入站 Handler 调用 ctx.channel().write(msg) 时,它开始从 Pipeline 尾部查找出站处理器,并依次执行:

    • 最后一个出站 Handler(Handler6)打印 这是第 3 道出站工序,调用 ctx.write(msg, promise) 将消息交给前一个出站 Handler;
    • 然后 Handler5 打印 这是第 2 道出站工序
    • 接着 Handler4 打印 这是第 1 道出站工序

    完成后消息会真正被写出到 Channel 中。这里的关键是:出站 Handler 的执行顺序与它们添加的顺序相反,也就是从尾部开始触发。

  3. 输出流程图

在这里插入图片描述

这就完美地展现了入站与出站的“正序与逆序”运行机制。😎

补充说明:ctx.channel().write(msg) vs ctx.write(msg)

  • ctx.channel().write(msg)
    从整个 Pipeline 的尾部开始查找出站处理器,这意味着总是会从最新添加的出站 Handler 开始执行。

  • ctx.write(msg)
    则从当前 Handler 的上一个出站 Handler 开始触发,如果在当前节点之前没有出站 Handler,则不会触发出站逻辑。

例如,在第三个入站 Handler中使用 ctx.channel().write(msg) 能确保出站 Handler 全部触发;如果改成 ctx.write(msg),而节点3自身后没有出站处理器,则可能只触发部分或不触发任何出站 Handler。

顺序模拟

上面的大致执行顺序你应该了解了,接下来实战一下看看具体效果是否真的是符合我们的想法。我已经在上面服务端代码标记好了需要操作的代码行,标记了 1、2、3、4、5、6,你需要先找到:

在这里插入图片描述

首先我将 1 处的代码注释掉,查看运行结果:

在这里插入图片描述

可以看到只打印了第一道工序,后面的都没有打印,接下来我把 1 处的代码注释回来,然后注释 2 处的代码,查看运行结果:

在这里插入图片描述

由输出结果咱可以得出结论:如果要调用下一个入站处理器,就必须使用 ctx.fireChannelRead(msg) 向下传递,出站也是同样的,你们可以自行的去试一下,比如注释掉 5 或者 6 来看一下输出结果

注意 : 你如果将 3 处的 ctx.channel().write(msg) 误写成了 ctx.write(msg),仅会打印 如下结果,因为节点 3 之前没有其它出站处理器了
在这里插入图片描述
6 处的 ctx.write(msg, promise) 误写成了 ctx.channel().write(msg) 会打印如下结果, 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点 6 自己
在这里插入图片描述


三、 总结:特种兵与流水线的完美协作🎉

  • ChannelHandler 就像车间里的各路特种兵,不同的 Handler 负责不同的加工任务——有的负责“进料”(入站),有的负责“出货”(出站)。
  • ChannelPipeline 则是连接这些特种兵的生产线,就像流水线一样确保数据能够逐个被加工、转换后顺利送出。
  • 执行顺序
    • 入站:按照 Handler 添加的先后顺序依次处理(正序)。
    • 出站:按照 Handler 添加的逆序处理(倒序)。

这一切就好像是一场精心设计的接力赛,数据作为接力棒在各个 Handler 之间传递、加工,最终将高质量的产品“送达”客户端!🍔🚀

希望这份幽默而详细的讲解能让你对 Netty 的 Handler 和 Pipeline 有全新的认识。😄


网站公告

今日签到

点亮在社区的每一天
去签到