【图解IO与Netty系列】Netty源码解析——事件循环

发布于:2024-06-25 ⋅ 阅读:(168) ⋅ 点赞:(0)

Netty事件循环

当Netty服务端启动起来以后,就可以接受客户端发送的请求,接收到客户端发来的请求后就会有事件就绪,有事件就绪就会在Netty的事件循环中被监听到并处理,我们下面看看Netty事件循环的逻辑。

在这里插入图片描述

Netty中的NIO事件循环的代码位于NioEventLoop的run()方法内部,这个方法总体是一个for(;;)死循环,然后for循环里头依次执行的三个重要方法分别是:

  1. select():调用Selector#select()方法监听注册到Selector上的Channel,等待Channel关注的事件就绪。
  2. processSelectedKeys():处理就绪事件,会调用ChannelPipeline处理,ChannelPipeline又会通过责任链模式调用里面的ChannelHandler处理。
  3. runAllTasks():处理NioEventLoop中的taskQueue的异步任务。

这就是Netty事件循环的大体逻辑,下面我们进入代码解析。

源码解析

select()

    private int select(long deadlineNanos) throws IOException {
        if (deadlineNanos == NONE) {
            return selector.select();
        }
        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }

在NioEventLoop的事件循环中,首先是select()方法的调用,select()方法里面就是调用NioEventLoop对应的Selector的select()方法,阻塞当前线程,监听注册到Selector的Channel,等待事件就绪。

在这里插入图片描述

当有事件就绪时,当前线程就会解阻塞,然后调用processSelectedKeys()方法处理就绪事件。

processSelectedKeys()

NioEventLoop的 processSelectedKeys()方法会进入processSelectedKey(SelectionKey k, AbstractNioChannel ch)方法。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        try {
            int readyOps = k.readyOps();
            ...
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (...) {...}
    }

无论是accept事件,还是read事件时,都是调用unsafe.read()方法。

在这里插入图片描述

只是这里的Unsafe的类型有可能不一样,如果是NioServerSocketChannel的话,那么Unsafe的类型就是NioMessageUnsafe,是在创建NioServerSocketChannel时就创建好的,我们看一下NioMessageUnsafe的read()方法。

NioMessageUnsafe#read()

    private final class NioMessageUnsafe extends AbstractNioUnsafe {

        private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() {
            	...
                doReadMessages(readBuf);
				...
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    ...
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                ...
        }
    }

重要方法就两个doReadMessages(readBuf)和pipeline.fireChannelRead(readBuf.get(i)),其他代码全部省略不看。

在这里插入图片描述

我们先看doReadMessages(readBuf)

@Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());
		...
                buf.add(new NioSocketChannel(this, ch));
				...
    }

SocketUtils.accept(javaChannel())里面是调用ServerSocketChannel的accept()方法,返回一个SocketChannel。

然后new NioSocketChannel(this, ch)把返回的SocketChannel包装成NioSocketChannel,放入buf中,这个buf就是外面read()方法的readBuf。

在这里插入图片描述

NioSocketChannel的构造方法调用父类AbstractNioByteChannel的构造方法:

    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }

可以看到指定关注的事件类型是read事件,再看看AbstractNioByteChannel的父类AbstractNioChannel的构造方法

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (...) {...}
    }

继续调用父类的构造方法,然后保存了NioSocketChannel和关注的事件类型OP_READ,设置NioSocketChannel为非阻塞。

再看AbstractNioChannel的父类AbstractChannel的构造方法

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

创建一个Unsafe,然后初始化了ChannelPipeline。newUnsafe()方法会进入NioSocketChannel的newUnsafe()方法,创建的时NioSocketChannelUnsafe类型的Unsafe,因此NioSocketChannel的Unsafe类型就是NioSocketChannelUnsafe。

    protected AbstractNioUnsafe newUnsafe() {
        return new NioSocketChannelUnsafe();
    }

NioSocketChannel的构造方法总结起来就是做了5件事:

  1. 创建并保存NioSocketChannelUnsafe
  2. 创建并保存ChannelPipeline
  3. 保存SocketChannel
  4. 保存关注的事件类型OP_READ
  5. 设置SocketChannel为非阻塞

在这里插入图片描述

然后回到unsafe.read()方法,接下来执行的代码pipeline.fireChannelRead(readBuf.get(i))就是调用触发就绪事件的Channel对应的ChannelPipeline的fireChannelRead(…)方法,触发ChannelPipeline中每个处理入站事件的ChannelHandler的入站事件处理。

ChannelPipeline的fireChannelRead(…)方法会从头到尾以责任链的处理方式调用每个ChannelInboundHandler类型的channelRead(…)方法。NioServerSocketChannel的ChannelPipeline中的ChannelHandler是ServerBootstrapAcceptor。我们看看ServerBootstrapAcceptor的channelRead(…)方法。

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {...});
            } catch (...) {...}
        }

这里的参数msg的类型是NioSocketChannel,就是上面放入buf中的NioSocketChannel,调用pipeline.fireChannelRead(readBuf.get(i))方法时readBuf.get(i)就是中buf中取出NioSocketChannel作为参数传进来。

然后child.pipeline().addLast(childHandler)这一行这里的childHandler是我们定义的ChannelInitializer,这里放入NioSocketChannel的ChannelPipeline中,当NioSocketChannel里面的SocketChannel被注册到Selector之后,会触发ChannelInitializer的调用,初始化ChannelPipeline。

然后childGroup.register(child)就是把这个NioSocketChannel注册到workerGroup中的其中一个NioEventLoop上,也就是把NioSocketChannel中的SocketChannel注册到workerGroup中的其中一个NioEventLoop的Selector上。

在这里插入图片描述

这里NioSocketChannel注册的细节跟NioServerSocketChannel的注册是一样的,上一篇文章已经分析过,这里就不重复了。

NioSocketChannel注册好之后,就可以接收客户端发来的数据,于是又有read事件触发,此时调用的unsafe.read(),就是NioSocketChannelUnsafe的父类NioByteUnsafe的read()方法了。

NioByteUnsafe#read()

        @Override
        public final void read() {
            		...
                    byteBuf = allocHandle.allocate(allocator);
                    doReadBytes(byteBuf);
					...
                    pipeline.fireChannelRead(byteBuf);
					...
    	}

就是通过allocator分配一个ByteBuf,然后把Channel中的数据读取到ByteBuf中,然后调用pipeline.fireChannelRead(byteBuf)触发ChannelPipeline的处理,然后ChannelPipeline中的ChannelHandler就会处理byteBuf中的数据,这里的ChannelPipeline中的ChannelHandler就是我们定义的ChannelInitializer组装到ChannelPipeline中的ChannelHandler了。

在这里插入图片描述

runAllTasks()

runAlllTasks方法其实就是从NioEventLoop的taskQueue中不停的取出task并执行。

    protected boolean runAllTasks(long timeoutNanos) {
        ...
        Runnable task = pollTask();
        ...
        for (;;) {
            safeExecute(task);
			...
            task = pollTask();
            if (task == null) {
                ...
                break;
            }
        }
		...
    }

可以看到,就是在一个for循环中,pollTask()方法取出task,然后在下一轮循环中调用safeExecute(task)去执行,safeExecute(task)里面就是调用task.run()方法直接执行,没什么好看的。如果pollTask()方法取出的task为null,那么就break结束循环。


网站公告

今日签到

点亮在社区的每一天
去签到