Netty学习

发布于:2025-09-13 ⋅ 阅读:(22) ⋅ 点赞:(0)

是什么

想象一下你要在两个城市之间建立一条高效的物流系统:

  • 传统方式(BIO):每来一件货物(一个请求),我就派一辆卡车和一个司机专门负责这趟运输。司机要一直等到货物装完、运输、卸完才能回来接下一单。如果货物没到,司机就得空等,非常浪费资源。
  • Netty 方式(NIO):我建立一个智能物流中心(Netty)。这个中心有一个总调度员(Reactor 线程),他负责盯着所有货车的状态(连接通道是否就绪)。一旦某辆货车的货物准备好了(数据就绪),调度员就通知一个空闲的装卸工(Worker 线程)去处理这辆车的装卸。这样,少量的司机和装卸工就能处理海量的货物。

结论:Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能服务器和客户端。它极大地简化了网络编程,让你不用从底层 Socket 开始苦苦折腾。

入门

基于 Netty 框架的简单客户端 - 服务器通信功能

基本目录:

在这里插入图片描述

其中:

java/
└── com/
    └── netty/
        ├── NettyClient.java       // 客户端启动类
        ├── NettyServer.java       // 服务端启动类
        ├── client/                // 客户端相关组件
        │   ├── ClientInitializer.java  // 客户端 Channel 初始化器
        │   └── ClientHandler.java      // 客户端业务处理器
        └── server/                // 服务端相关组件
            ├── ServerInitializer.java  // 服务端 Channel 初始化器
            └── ServerHandler.java      // 服务端业务处理器

服务器功能

public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 此时msg已经被解码器转为String,无需手动释放
        String in = (String) msg;
        System.out.println("Server 接收: " + in);

        // 直接发送字符串,编码器会自动转为ByteBuf
        String response = "Hello from server!";
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
public class ServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 添加字符串解码器
        pipeline.addLast(new StringDecoder());
        // 添加字符串编码器
        pipeline.addLast(new StringEncoder());
        // 添加自定义的处理器
        pipeline.addLast(new ServerHandler());
    }

}
public class NettyServer  {
    public static void main(String[] args)  throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();//处理连接请求
        EventLoopGroup workerGroup = new NioEventLoopGroup();//处理读写事件

       try {
            // 创建服务器启动引导类
            ServerBootstrap b = new ServerBootstrap();
            // 配置服务器参数
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)          // 使用NIO传输通道
             .childHandler(new ServerInitializer());         // 设置业务处理器

            // 绑定端口并启动服务器
            ChannelFuture f = b.bind(8080).sync();
            System.out.println("Server started on port 8080");

            // 等待服务器通道关闭
            f.channel().closeFuture().sync();
        } finally {
            // 关闭EventLoopGroup
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }

    }
}
  • 监听本地 8080 端口
  • 接收客户端发送的字符串消息并打印
  • 收到消息后,向客户端返回 “Hello from server!” 响应

客户端功能

public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 直接发送字符串,编码器会自动转为ByteBuf
        String msg = "Hello from client!";
        ctx.writeAndFlush(msg);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 此时msg已经被解码器转为String
        String in = (String) msg;
        System.out.println("Client 接收: " + in);
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new StringDecoder());
        pipeline.addLast(new StringEncoder());
        pipeline.addLast(new ClientHandler());
    }
}
public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        // 创建EventLoopGroup实例
        EventLoopGroup group = new NioEventLoopGroup();  // 处理I/O操作和业务逻辑

        try {
            // 创建客户端启动引导类
            Bootstrap b = new Bootstrap();
            // 配置客户端参数
            b.group(group)
             .channel(NioSocketChannel.class)           // 使用NIO套接字通道
             .handler(new ClientInitializer());         // 设置业务处理器

            // 连接服务器
            ChannelFuture f = b.connect("localhost", 8080).sync();
            
            // 等待连接关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅关闭EventLoopGroup
            group.shutdownGracefully();
        }
    }
}
  • 连接本地 8080 端口的服务器
  • 连接成功后向服务器发送 “Hello from client!” 消息
  • 接收服务器返回的响应消息并打印
  • 完成通信后关闭连接

时间线:

  1. 客户端连接服务器 (TCP握手完成)
  2. 客户端 channelActive() 触发 → 发送 “Hello from client!”
  3. 服务器收到数据 → channelRead() 触发 → 发送 “Hello from server!”
  4. 客户端收到响应 → channelRead() 触发 → 关闭连接

在这里插入图片描述

在这里插入图片描述

在这其中,Netty起作用如下:

代替原生 Java NIO:

// Netty 封装了复杂的 NIO 操作
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
 .channel(NioSocketChannel.class)  // Netty 提供的 NIO 通道实现
 .handler(new ClientInitializer());

提供现成的编解码器:

pipeline.addLast(new StringDecoder());  // 字节 → 字符串
pipeline.addLast(new StringEncoder());  // 字符串 → 字节

事件模型:

public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        // 连接建立时自动回调
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 收到数据时自动回调
    }
}

管道机制:

ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());    // 处理器1:解码
pipeline.addLast(new StringEncoder());    // 处理器2:编码  
pipeline.addLast(new ServerHandler());    // 处理器3:业务逻辑

ByteBuf:

// 底层使用 Netty 的高效缓冲区
// StringEncoder 自动将 String 转换为优化的 ByteBuf
ctx.writeAndFlush(response);

自动处理:

  • TCP 连接的建立和维护
  • 连接的超时和重连
  • 优雅的关闭和资源释放
// Netty 自动管理连接生命周期
ChannelFuture f = b.bind("localhost", 8080).sync();
f.channel().closeFuture().sync();

感谢阅读>W<