Netty介绍和基本代码演示

发布于:2025-07-17 ⋅ 阅读:(19) ⋅ 点赞:(0)

什么是Netty?

Netty是一个基于Java NIO的异步事件驱动的网络应用框架,主要用于快速开发高性能、高可靠性的网络服务器和客户端程序。它简化了网络编程的复杂性,提供了丰富的协议支持,被广泛应用于各种高性能网络应用中。

为什么选择Netty?

  1. 高性能:基于NIO,支持异步非阻塞I/O
  2. 高并发:能够处理大量并发连接
  3. 易用性:提供了简洁的API,降低了网络编程的复杂度
  4. 可扩展性:模块化设计,易于扩展和定制
  5. 稳定性:经过大量生产环境验证

核心概念

Channel(通道)

Channel是Netty网络操作的基础,代表一个到实体的连接,如硬件设备、文件、网络套接字等。

EventLoop(事件循环)

EventLoop负责处理Channel上的I/O操作,一个EventLoop可以处理多个Channel。

ChannelHandler(通道处理器)

ChannelHandler处理I/O事件,如连接建立、数据读取、数据写入等。

Pipeline(管道)

Pipeline是ChannelHandler的容器,定义了ChannelHandler的处理顺序。

基本代码演示

1. Maven依赖配置
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.94.Final</version>
</dependency>
2. 简单的Echo服务器
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class EchoServer {
    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        // 创建boss线程组,用于接收连接
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 创建worker线程组,用于处理连接
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // 创建服务器启动引导类
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 添加字符串编解码器
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            // 添加自定义处理器
                            pipeline.addLast(new EchoServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // 绑定端口并启动服务器
            ChannelFuture future = bootstrap.bind(port).sync();
            System.out.println("Echo服务器启动成功,监听端口: " + port);

            // 等待服务器关闭
            future.channel().closeFuture().sync();
        } finally {
            // 优雅关闭线程组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new EchoServer(8080).start();
    }
}
3. 服务器处理器
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String message = (String) msg;
        System.out.println("服务器收到消息: " + message);
        
        // 回显消息给客户端
        ctx.writeAndFlush("服务器回复: " + message + "\n");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端连接: " + ctx.channel().remoteAddress());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端断开连接: " + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
4. 客户端实现
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;

public class EchoClient {
    private final String host;
    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new EchoClientHandler());
                        }
                    });

            // 连接服务器
            ChannelFuture future = bootstrap.connect(host, port).sync();
            Channel channel = future.channel();
            System.out.println("连接到服务器: " + host + ":" + port);

            // 从控制台读取输入并发送
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String line = scanner.nextLine();
                if ("quit".equals(line)) {
                    break;
                }
                channel.writeAndFlush(line + "\n");
            }

            // 关闭连接
            channel.closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new EchoClient("localhost", 8080).start();
    }
}
5. 客户端处理器
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String message = (String) msg;
        System.out.println("客户端收到消息: " + message);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端连接成功");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

运行步骤

  1. 首先运行 EchoServer 类启动服务器
  2. 然后运行 EchoClient 类启动客户端
  3. 在客户端控制台输入消息,服务器会回显消息

运行EchoServer 输出:

09:53:01.748 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
09:53:01.755 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
09:53:01.780 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
09:53:01.780 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
09:53:01.820 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
09:53:01.821 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8
09:53:01.822 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
09:53:01.823 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
09:53:01.823 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.storeFence: available
09:53:01.824 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
09:53:01.824 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
09:53:01.825 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
09:53:01.825 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
09:53:01.825 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available
09:53:01.825 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
09:53:01.826 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\H-ZHON~1\AppData\Local\Temp (java.io.tmpdir)
09:53:01.826 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
09:53:01.827 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: Windows
09:53:01.830 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3678404608 bytes
09:53:01.830 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
09:53:01.832 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available
09:53:01.832 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
09:53:01.833 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
09:53:01.833 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
09:53:01.849 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
09:53:02.186 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 2552 (auto-detected)
09:53:02.187 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
09:53:02.187 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
09:53:02.489 [main] DEBUG io.netty.util.NetUtilInitializations - Loopback interface: lo (Software Loopback Interface 1, 127.0.0.1)
09:53:02.490 [main] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200
09:53:02.744 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 00:ff:86:ff:fe:50:58:66 (auto-detected)
09:53:02.760 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
09:53:02.760 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 9
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 4194304
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: false
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
09:53:02.796 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
09:53:02.796 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
09:53:02.796 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
Echo服务器启动成功,监听端口: 8080

运行 EchoClient,控制台输入消息, 输出:

09:55:09.541 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
09:55:09.547 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
09:55:09.568 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
09:55:09.568 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
09:55:09.597 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
09:55:09.597 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8
09:55:09.599 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
09:55:09.599 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
09:55:09.600 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.storeFence: available
09:55:09.600 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
09:55:09.601 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
09:55:09.601 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
09:55:09.601 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
09:55:09.601 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available
09:55:09.601 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
09:55:09.602 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\H-ZHON~1\AppData\Local\Temp (java.io.tmpdir)
09:55:09.602 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
09:55:09.603 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: Windows
09:55:09.605 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3678404608 bytes
09:55:09.605 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
09:55:09.606 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available
09:55:09.607 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
09:55:09.607 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
09:55:09.608 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
09:55:09.618 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
09:55:10.051 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 21460 (auto-detected)
09:55:10.054 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
09:55:10.054 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
09:55:10.475 [main] DEBUG io.netty.util.NetUtilInitializations - Loopback interface: lo (Software Loopback Interface 1, 127.0.0.1)
09:55:10.477 [main] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200
09:55:10.773 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 00:ff:86:ff:fe:50:58:66 (auto-detected)
09:55:10.786 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
09:55:10.787 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 9
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 4194304
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: false
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
09:55:10.821 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
09:55:10.821 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
09:55:10.822 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
连接到服务器: localhost:8080
客户端连接成功

09:56:56.659 [main] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096
09:56:56.659 [main] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
09:56:56.659 [main] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.chunkSize: 32
09:56:56.659 [main] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.blocking: false
09:56:56.672 [nioEventLoopGroup-2-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
09:56:56.672 [nioEventLoopGroup-2-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
09:56:56.673 [nioEventLoopGroup-2-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@692bca4c
客户端收到消息: 服务器回复: 


hello
客户端收到消息: 服务器回复: hello


testNettyServer
客户端收到消息: 服务器回复: testNettyServer

代码解析

服务器端关键点:
  1. EventLoopGroup:创建两个线程组,bossGroup负责接收连接,workerGroup负责处理连接
  1. ServerBootstrap:服务器启动引导类,配置各种参数
  1. ChannelInitializer:初始化Channel,添加编解码器和处理器
  1. Pipeline:处理器链,定义了消息处理的顺序
客户端关键点:
  1. Bootstrap:客户端启动引导类
  1. 连接建立:通过connect方法连接到服务器
  1. 消息发送:通过Channel发送消息

实际应用建议

  1. 合理配置线程池大小:根据CPU核心数和业务特点调整

感谢阅读


网站公告

今日签到

点亮在社区的每一天
去签到