基于Netty服务端快速了解核心组件

发布于:2024-07-27 ⋅ 阅读:(57) ⋅ 点赞:(0)

写在文章开头

由于Netty优秀的设计和封装,开发一个高性能网络程序就变得非常简单,本文从一个简单的服务端落地简单介绍一下Netty中的几个核心组件,希望对你有帮助。

在这里插入图片描述

Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

快速落地一个服务端

我们希望通过Netty快速落地一个简单的主从reactor模型,由主reactor对应的线程组接收连接交由acceptor创建连接,与之建立的客户端的读写事件都会交由从reactor对应的线程池处理:

在这里插入图片描述

基于此设计,我们通过Netty写下如下代码,可以看到我们做了如下几件事:

  1. 声明一个服务端创建引导类ServerBootstrap ,负责配置服务端及其启动工作。
  2. 声明主从reactor线程组,其中boss可以看作监听端口接收新连接的线程组,而work则是负责处理客户端数据读写的线程组。
  3. 基于上述线程池作为group的入参完成主从reactor模型创建。
  4. 通过channel函数指定server channeNioServerSocketChannel即采用NIO模型,而NioServerSocketChannel我们可以直接理解为serverSocket的抽象表示。
  5. 通过childHandler方法给引导设置每一个连接数据读写的处理器handler

最后调用bind启动服务端并通过addListener对连接结果进行异步监听:

public static void main(String[] args) {
        //1. 声明一个服务端创建引导类
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //2. 声明主从reactor线程组
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());


        serverBootstrap.group(boss, worker)//3. 基于上述线程池创建主从reactor模型
                .channel(NioServerSocketChannel.class)//server channel采用NIO模型
                .childHandler(new ChannelInitializer<NioSocketChannel>() {//添加客户端读写请求处理器到subreactor中
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // 对于ChannelInboundHandlerAdapter,收到消息后会按照顺序执行即 A -> B->ServerHandler
                        ch.pipeline().addLast(new InboundHandlerA())
                                .addLast(new InboundHandlerB())
                                .addLast(new ServerHandler());

                        // 处理写数据的逻辑,顺序是反着的 B -> A
                        ch.pipeline().addLast(new OutboundHandlerA())
                                .addLast(new OutboundHandlerB())
                                .addLast(new OutboundHandlerC());

                        ch.pipeline().addLast(new ExceptionHandler());
                    }
                });
        //绑定8080端口并设置回调监听结果
        serverBootstrap.bind("127.0.0.1", 8080)
                .addListener(f -> {
                    if (f.isSuccess()) {
                        System.out.println("连接成功");
                    }
                });
    }

对于客户端的发送的数据,我们都会通过ChannelInboundHandlerAdapter添加顺序处理,就如代码所示我们的执行顺序为InboundHandlerA->InboundHandlerB->ServerHandler,对此我们给出InboundHandlerA的代码,InboundHandlerB内容一样就不展示了:

public class InboundHandlerA extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InBoundHandlerA : " + ((ByteBuf)msg).toString(StandardCharsets.UTF_8));
        //将当前的处理过的msg转交给pipeline的下一个ChannelHandler
        super.channelRead(ctx, msg);
    }
}

ServerHandler的则是:

  1. 客户端与服务端建立连接,对应客户端channel被激活,触发channelActive方法。
  2. ChannelHandlerContextChannel 已注册到其 EventLoop 中,执行channelRegistered
  3. ChannelHandler 添加到实际上下文并准备好处理事件后调用。
  4. 解析客户端的数据然后回复Hello Netty client
private static class ServerHandler extends ChannelInboundHandlerAdapter {

		@Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("channel被激活,执行channelActive");
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) {
            System.out.println("执行channelRegistered");
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            System.out.println("执行handlerAdded");
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf) msg;
            //打印读取到的数据
            System.out.println(new Date() + ": 服务端读到数据 -> " + byteBuf.toString(StandardCharsets.UTF_8));

            // 回复客户端数据
            System.out.println(new Date() + ": 服务端写出数据");
            //组装数据并发送
            ByteBuf out = getByteBuf(ctx);
            ctx.channel().writeAndFlush(out);
            super.channelRead(ctx, msg);
        }

        private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
            ByteBuf buffer = ctx.alloc().buffer();

            byte[] bytes = "Hello Netty client ".getBytes(StandardCharsets.UTF_8);

            buffer.writeBytes(bytes);

            return buffer;
        }

     	//......
    }

我们通过telnet 对应ip和端口后发现服务端输出如下内容,也很我们上文说明的一致:

执行handlerAdded
执行channelRegistered
端口绑定成功,channel被激活,执行channelActive

然后我们发送消息 1,可以看到触发所有inboundchannelRead方法:

InBoundHandlerA : 1
InBoundHandlerB: 1
Wed Jul 24 00:05:18 CST 2024: 服务端读到数据 -> 1

然后我们回复hello netty client,按照添加的倒叙触发OutBoundHandler

Wed Jul 24 00:05:18 CST 2024: 服务端写出数据
OutBoundHandlerC: Hello Netty client 
OutBoundHandlerB: Hello Netty client 
OutBoundHandlerA: Hello Netty client 

详解Netty中的核心组件

channel接口

channel是Netty对于底层class socket中的bind、connect、read、write等原语的封装,简化了我们网络编程的复杂度,同时Netty也提供的各种现成的channel,我们可以根据个人需要自行使用。
下面便是笔者比较常用的Tcp或者UDP中比较常用的几种channel。

  1. NioServerSocketChannel:基于NIO选择器处理新连接。
  2. EpollServerSocketChannel:使用 linux EPOLL Edge 触发模式实现最大性能的实现。
  3. NioDatagramChannel:发送和接收 AddressedEnvelope 的 NIO 数据报通道。
  4. EpollDatagramChannel:使用 linux EPOLL Edge 触发模式实现最大性能的 DatagramChannel 实现。

EventLoop接口

Netty中,所有channel都会注册到某个eventLoop上, 每一个EventLoopGroup中有一个或者多个EventLoop,而每一个EventLoop都绑定一个线程,负责处理一个或者多个channel的事件:

在这里插入图片描述

这里我们也简单的给出NioEventLoop中的run方法,它继承自SingleThreadEventExecutor,我们可以大概看到NioEventLoop的核心逻辑本质就是轮询所有注册到NioEventLoop上的channel(socket的抽象)是否有其就绪的事件,然后

  @Override
    protected void run() {
        for (;;) {
            try {
            	//基于selectStrategy轮询查看是否有就绪事件
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
						//......

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                //根据IO配比执行网络IO事件方法processSelectedKeys以及其他事件方法runAllTasks
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
           //......
        }
    }

pipeline和channelHandler以channelHandlerContext

每一个channel的事件都会交由channelHandler处理,而负责同一个channelchannelHandler都会交由pipeline一条逻辑链进行连接,这两者的的关系都会一一封装成channelHandlerContextchannelHandlerContext主要是负责当前channelHandler和与其同一条channelpipeline上的其他channelHandler之间的交互。

举个例子,当我们收到客户端的写入数据时,这些数据就会交由pipeline上的channelHandler处理,如下图所示,从第一个channelHandler处理完成之后,每个channelHandlerContext就会将消息转交到当前pipeline的下一个channelHandler处理:

在这里插入图片描述

假设我们的channelHandler执行完ChannelActive后,如希望继续传播则会调用fireChannelActive

 @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("端口绑定成功,channel被激活,执行channelActive");
            ctx.fireChannelActive()
        }

查看其内部逻辑即可知晓,它就是通过AbstractChannelHandlerContext得到pipeline的下一个ChannelHandler并执行其channelActive方法:

 @Override
    public ChannelHandlerContext fireChannelActive() {
        final AbstractChannelHandlerContext next = findContextInbound();
        invokeChannelActive(next);
        return this;
    }

回调的思想

我们可以说回调其实是一种设计思想,Netty对于连接或者读写操作都是异步非阻塞的,所以我们希望在连接被建立进行一些响应的处理,那么Netty就会在连接建立的时候方法暴露一个回调方法供用户实现个性逻辑。

例如我们的channel连接被建立时,其底层就会调用invokeChannelActive获取我们绑定的ChannelInboundHandler并执行其channelActive方法:

private void invokeChannelActive() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelActive();
        }
    }

于是就会调用到我们服务端ServerHandlerchannelActive方法:

private static class ServerHandler extends ChannelInboundHandlerAdapter {

        //......

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("端口绑定成功,channel被激活,执行channelActive");
        }
		//......
}

Future异步监听

为保证网络服务器执行的效率,Netty大部分网络IO操作都采用异步的,以笔者建立连接设置的监听器为例,当前连接成功后,就会返回给监听器一个java.util.concurrent.Future,我们就可以通过这个f获取连接的结果是否成功:

//绑定8080端口并设置回调监听结果
        serverBootstrap.bind("127.0.0.1", 8080)
                .addListener(f -> {
                    if (f.isSuccess()) {
                        System.out.println("连接成功");
                    }
                });

我们步入DefaultPromiseaddListener即可发现其内部就是添加监听后判断这个连接的异步任务Future是否完成,如果完成调用notifyListeners回调我们的监听器的逻辑:

@Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        //......
		//添加监听
        synchronized (this) {
            addListener0(listener);
        }
		//连接任务完成,通知监听器
        if (isDone()) {
            notifyListeners();
        }

        return this;
    }

小结

以上便是笔者对于Netty服务端的基础使用和分析,希望对你有帮助。

我是 sharkchiliCSDN Java 领域博客专家开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

参考

高性能网络编程之 Reactor 网络模型(彻底搞懂):https://blog.csdn.net/ldw201510803006/article/details/124365838

《Netty in action》

《跟闪电侠学Netty》


网站公告

今日签到

点亮在社区的每一天
去签到