说明
Netty 封装了 Java NIO 的很多功能,大大简化了 Java 网络编程的难度,同时 Netty 也支持多种协议,Netty 架构图如下
注:上图来自 Netty 官网
BIO 模型
传统的Java BIO模型代码如下
客户端代码
import java.net.Socket; import java.util.Date; /** * 传统 BIO 的客户端实现 * * @author <a href="mailto:410486047@qq.com">Grey</a> * @date 2022/9/12 * @since 1.1 */ public class IOClient { public static void main(String[] args) { new Thread(() -> { try { Socket socket = new Socket("127.0.0.1", 8000); while (true) { try { socket.getOutputStream().write((new Date() + ": hello world").getBytes()); Thread.sleep(2000); } catch (Exception e) { } } } catch (Exception e) { } }).start(); } }
服务端代码
package bio; import java.io.IOException; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; /** * 传统 BIO 的 服务端实现 * * @author <a href="mailto:410486047@qq.com">Grey</a> * @date 2022/9/12 * @since 1.1 */ public class IOServer { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(8000); new Thread(() -> { while (true) { try { // 阻塞 Socket socket = serverSocket.accept(); new Thread(() -> { try { int len; byte[] data = new byte[1024]; InputStream inputStream = socket.getInputStream(); // 按照字节流的方式读取数据 while ((len = inputStream.read(data)) != -1) { System.out.println(new String(data, 0, len)); } } catch (IOException e) { } }).start(); } catch (IOException e) { } } }).start(); } }
上述代码比较直白,缺点也很明显
每个连接创建成功后都需要由一个线程来维护,同一时刻有大量线程处于阻塞状态,此外,线程数量太多,也会导致操作系统频繁进行线程切换,使得应用性能下降。
NIO 模型
为了解决 BIO 的问题,引入了 NIO,即:一个新的连接来了以后,不会创建一个while 死循环取监听有数据可读,而是直接把这条连接注册到 Selector 上。然后,通过检查这个 Selector,就可以批量监测出有数据可读的连接,进而读取数据。
BIO读写是面向流的,一次性只能从流中读取一个字节或者多字节,并且读完之后流无法再读取,需要自己缓存数据。而 NIO 的读写是面向 Buffer 的,可以随意读取里面任何字节的数据,不需要自己缓存数据,只需要移动读写指针即可。
但是 Java 原生的 NIO 代码编程非常繁琐,一个简单的服务端代码,使用 NIO 模型,代码如下
package nio; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Set; /** * NIO 实现服务端 * * @author <a href="mailto:410486047@qq.com">Grey</a> * @date 2022/9/12 * @since 1.4 */ public class NIOServer { public static void main(String[] args) throws Exception { Selector serverSelector = Selector.open(); Selector clientSelector = Selector.open(); new Thread(() -> { try { ServerSocketChannel listenerChannel = ServerSocketChannel.open(); listenerChannel.socket().bind(new InetSocketAddress(8000)); listenerChannel.configureBlocking(false); listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT); while (true) { if (serverSelector.select(1) > 0) { Set<SelectionKey> set = serverSelector.selectedKeys(); Iterator<SelectionKey> keyIterator = set.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if (key.isAcceptable()) { try { SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept(); clientChannel.configureBlocking(false); clientChannel.register(clientSelector, SelectionKey.OP_READ); } finally { keyIterator.remove(); } } } } } } catch (Exception e) { } }).start(); new Thread(() -> { try { while (true) { if (clientSelector.select(1) > 0) { Set<SelectionKey> set = clientSelector.selectedKeys(); Iterator<SelectionKey> keyIterator = set.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if (key.isReadable()) { try { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); clientChannel.read(byteBuffer); byteBuffer.flip(); System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer).toString()); } finally { keyIterator.remove(); key.interestOps(SelectionKey.OP_READ); } } } } } } catch (Exception e) { } }).start(); } }
Netty 客户端和服务端
Netty 解决了 NIO 编程繁琐的痛点,封装了很多友好的 API,
同样实现服务端和客户端,如果使用 Netty,就简单很多
使用 Netty 实现一个最简单的服务端(每个组件使用见注释)
package netty.v1; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; /** * 使用 Netty 实现服务端 * * @author <a href="mailto:410486047@qq.com">Grey</a> * @date 2022/9/12 * @since */ public class NettyServer { public static void main(String[] args) { // 引导服务端的启动 ServerBootstrap serverBootstrap = new ServerBootstrap(); // 用于监听端口,接收新连接的线程组 NioEventLoopGroup boss = new NioEventLoopGroup(); // 表示处理每一个连接的数据读写的线程组 NioEventLoopGroup worker = new NioEventLoopGroup(); serverBootstrap.group(boss, worker) // 指定IO模型为NIO .channel(NioServerSocketChannel.class) // 定义后面每一个连接的数据读写 .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) { System.out.println("服务启动中......"); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println(msg); } }); } }) // 本地绑定一个8000端口启动服务端 .bind(8000); } }
使用 Netty 实现一个最简单的客户端(每个组件说明见注释)
package netty.v1; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringEncoder; import java.util.Date; /** * Netty 实现客户端 * * @author <a href="mailto:410486047@qq.com">Grey</a> * @date 2022/9/12 * @since */ public class NettyClient { public static void main(String[] args) throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); NioEventLoopGroup group = new NioEventLoopGroup(); bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<>() { @Override protected void initChannel(Channel channel) { channel.pipeline().addLast(new StringEncoder()); } }); Channel channel = bootstrap.connect("localhost", 8000).channel(); while (true) { channel.writeAndFlush(new Date() + ": hello world!"); Thread.sleep(2000); } } }
注:运行上述代码需要引入 Netty 依赖包
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.80.Final</version> </dependency>
更复杂一点的场景
在 Netty 简单客户端和服务端基础上,增加一些更复杂的场景,比如:
服务端支持端口检测,即:针对已经被占用的端口,可以调整端口配置并自动绑定到一个空闲端口
客户端支持重连,即:设置一个最大重连次数,客户端允许多次重新连接服务端直到达到最大重连次数。
服务端代码如下(增加的配置参数见注释说明)
package netty.v2; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.AttributeKey; /** * Netty 自动绑定递增端口 * * @author <a href="mailto:410486047@qq.com">Grey</a> * @date 2022/9/12 * @since */ public class NettyServerBindDynamicPort { public static void main(String[] args) { // 引导服务端的启动 ServerBootstrap serverBootstrap = new ServerBootstrap(); // 用于监听端口,接收新连接的线程组 NioEventLoopGroup boss = new NioEventLoopGroup(); // 表示处理每一个连接的数据读写的线程组 NioEventLoopGroup worker = new NioEventLoopGroup(); serverBootstrap.group(boss, worker) // 指定IO模型为NIO .channel(NioServerSocketChannel.class) // 可以给服务端的Channel指定一些属性,非必须 .attr(AttributeKey.newInstance("serverName"), "nettyServer") // 可以给每一个连接都指定自定义属性,非必须 .childAttr(AttributeKey.newInstance("clientKey"), "clientValue") // 使用option方法可以定义服务端的一些TCP参数 // 这个设置表示系统用于临时存放已经完成三次握手的请求的队列的最大长度, // 如果连接建立频繁,服务器创建新的连接比较慢,则可以适当调大这个参数 .option(ChannelOption.SO_BACKLOG, 1024) // 以下两个配置用于设置每个连接的TCP参数 // SO_KEEPALIVE: 表示是否开启TCP底层心跳机制,true表示开启 .childOption(ChannelOption.SO_KEEPALIVE, true) // TCP_NODELAY:表示是否开启Nagle算法,true表示关闭,false表示开启 // 如果要求高实时性,有数据发送时就马上发送,就设置为关闭; // 如果需要减少发送次数,减少网络交互,就设置为开启。 .childOption(ChannelOption.TCP_NODELAY, true) // 定义后面每一个连接的数据读写 .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) { System.out.println("服务启动中......"); } }); // 本地绑定一个8000端口启动服务 bind(serverBootstrap, 8000); } public static void bind(final ServerBootstrap serverBootstrap, final int port) { serverBootstrap.bind(port).addListener(future -> { if (future.isSuccess()) { System.out.println("端口[" + port + "]绑定成功"); } else { System.err.println("端口[" + port + "]绑定失败"); bind(serverBootstrap, port + 1); } }); } }
其中 bind
方法是递归函数,即每次尝试失败的时候,端口号加1,直到端口绑定成功为止。
客户端代码如下(增加的配置参数见注释说明)
package netty.v2; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.AttributeKey; import java.util.Date; import java.util.concurrent.TimeUnit; /** * Netty 实现可自动重连的客户端 * * @author <a href="mailto:410486047@qq.com">Grey</a> * @date 2022/9/12 * @since */ public class NettyClientRetry { static final int MAX_RETRY = 6; public static void main(String[] args) throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); NioEventLoopGroup group = new NioEventLoopGroup(); bootstrap // 指定线程模型 .group(group) // 指定IO类型为NIO .channel(NioSocketChannel.class) // attr可以为客户端Channel绑定自定义属性 .attr(AttributeKey.newInstance("clientName"), "nettyClient") // 连接的超时时间,如果超过这个时间,仍未连接到服务端,则表示连接失败 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 表示是否开启TCP底层心跳机制,true表示开启 .option(ChannelOption.SO_KEEPALIVE, true) // 是否开启Nagle算法,如果要求高实时性,有数据就马上发送,则为true // 如果需要减少发送次数,减少网络交互,就设置为false .option(ChannelOption.TCP_NODELAY, true) // IO处理逻辑 .handler(new ChannelInitializer<>() { @Override protected void initChannel(Channel channel) { } }); connect(bootstrap, "localhost", 8000, MAX_RETRY); } private static void connect(final Bootstrap bootstrap, final String host, final int port, int retry) { bootstrap.connect(host, port).addListener(future -> { if (future.isSuccess()) { System.out.println("连接成功!"); } else if (retry == 0) { System.err.println("重试次数已经使用完毕"); } else { // 第几次重试 int order = (MAX_RETRY - retry) + 1; // 本次的重试间隔 int delay = 1 << order; System.out.println(new Date() + ": 连接失败,第" + order + "次重连..."); bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS); } }); } }
其中 connect
也是递归方法,每次尝试连接失败的时候, retry
参数减1,直到为0。但是在通常情况下,连接失败不会立即重连,而是通过一个指数退避的方式,即:delay 参数的配置,每隔 2 的幂次时间来建立连接。