[netty5: ByteToMessageCodec & MessageToByteEncoder & ByteToMessageDecoder]-源码分析

发布于:2025-07-05 ⋅ 阅读:(19) ⋅ 点赞:(0)

ByteToMessageCodec

ByteToMessageCodec 是一个结合了 ByteToMessageDecoder 和 MessageToByteEncoder 的编解码器,可以实时地将字节流编码或解码为消息,反之亦然。

public abstract class ByteToMessageCodec<I> extends ChannelHandlerAdapter {

    private final TypeParameterMatcher outboundMsgMatcher;
    private final MessageToByteEncoder<I> encoder;

    private final ByteToMessageDecoder decoder = new ByteToMessageDecoder() {
        @Override
        public void decode(ChannelHandlerContext ctx, Buffer in) throws Exception {
            ByteToMessageCodec.this.decode(ctx, in);
        }

        @Override
        protected void decodeLast(ChannelHandlerContext ctx, Buffer in) throws Exception {
            ByteToMessageCodec.this.decodeLast(ctx, in);
        }
    };

    protected ByteToMessageCodec() {
        this((BufferAllocator) null);
    }

    protected ByteToMessageCodec(Class<? extends I> outboundMessageType) {
        this(outboundMessageType, null);
    }

    protected ByteToMessageCodec(BufferAllocator allocator) {
        outboundMsgMatcher = TypeParameterMatcher.find(this, ByteToMessageCodec.class, "I");
        encoder = new Encoder(allocator);
    }

    protected ByteToMessageCodec(Class<? extends I> outboundMessageType, BufferAllocator allocator) {
        outboundMsgMatcher = TypeParameterMatcher.get(outboundMessageType);
        encoder = new Encoder(allocator);
    }

    @Override
    public final boolean isSharable() {
        return false;
    }

    public boolean acceptOutboundMessage(Object msg) throws Exception {
        return outboundMsgMatcher.match(msg);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        decoder.channelRead(ctx, msg);
    }

    @Override
    public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
        return encoder.write(ctx, msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        decoder.channelReadComplete(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        decoder.channelInactive(ctx);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        try {
            decoder.handlerAdded(ctx);
        } finally {
            encoder.handlerAdded(ctx);
        }
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        try {
            decoder.handlerRemoved(ctx);
        } finally {
            encoder.handlerRemoved(ctx);
        }
    }

    protected abstract void encode(ChannelHandlerContext ctx, I msg, Buffer out) throws Exception;
    protected abstract void decode(ChannelHandlerContext ctx, Buffer in) throws Exception;

    protected void decodeLast(ChannelHandlerContext ctx, Buffer in) throws Exception {
        if (in.readableBytes() > 0) {
            decode(ctx, in);
        }
    }

    private final class Encoder extends MessageToByteEncoder<I> {
        private final BufferAllocator allocator;

        Encoder(BufferAllocator allocator) {
            this.allocator = allocator;
        }

        @Override
        public boolean acceptOutboundMessage(Object msg) throws Exception {
            return ByteToMessageCodec.this.acceptOutboundMessage(msg);
        }

        @Override
        protected Buffer allocateBuffer(ChannelHandlerContext ctx, I msg) throws Exception {
            BufferAllocator alloc = allocator != null? allocator : ctx.bufferAllocator();
            return alloc.allocate(256);
        }

        @Override
        protected void encode(ChannelHandlerContext ctx, I msg, Buffer out) throws Exception {
            ByteToMessageCodec.this.encode(ctx, msg, out);
        }
    }
}

MessageToByteEncoder

MessageToByteEncoder 是一个用于将消息编码为字节流的抽象类。它继承自 ChannelHandlerAdapter,并通过两个抽象方法来实现编码功能:allocateBuffer 用于分配一个 Buffer,encode 用于将消息编码到 Buffer 中。它通过 acceptOutboundMessage 方法决定是否处理给定的消息类型,并在 write 方法中执行编码操作。

public abstract class MessageToByteEncoder<I> extends ChannelHandlerAdapter {

    private final TypeParameterMatcher matcher;
    
    protected MessageToByteEncoder() {
        matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
    }

    protected MessageToByteEncoder(Class<? extends I> outboundMessageType) {
        matcher = TypeParameterMatcher.get(requireNonNull(outboundMessageType, "outboundMessageType"));
    }
    
    public boolean acceptOutboundMessage(Object msg) throws Exception {
        return matcher.match(msg);
    }

    @Override
    public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
        Buffer buf = null;
        try {
            if (acceptOutboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                buf = allocateBuffer(ctx, cast);
                try (AutoCloseable ignore = autoClosing(cast)) {
                    encode(ctx, cast, buf);
                }

                if (buf.readableBytes() > 0) {
                    Future<Void> f = ctx.write(buf);
                    buf = null;
                    return f;
                }
                return ctx.write(ctx.bufferAllocator().allocate(0));
            }
            return ctx.write(msg);
        } catch (EncoderException e) {
            return ctx.newFailedFuture(e);
        } catch (Throwable e) {
            return ctx.newFailedFuture(new EncoderException(e));
        } finally {
            if (buf != null) {
                buf.close();
            }
        }
    }

    protected abstract Buffer allocateBuffer(ChannelHandlerContext ctx, I msg) throws Exception;

    protected abstract void encode(ChannelHandlerContext ctx, I msg, Buffer out) throws Exception;
}

ByteToMessageDecoder

ByteToMessageDecoder 是 Netty 5 中用于处理 TCP 粘包拆包的解码器基础类。它通过维护一个累积缓冲区 cumulation,将接收到的 Buffer 连续拼接、缓存,并在适当时机调用 decode(…) 方法将字节流转为高层消息对象

public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {

    public static final Cumulator MERGE_CUMULATOR = new MergeCumulator();

    public static final Cumulator COMPOSITE_CUMULATOR = new CompositeBufferCumulator();

    private final int discardAfterReads = 16;
    
    private final Cumulator cumulator;

	// 累积的字节缓冲区,保存之前未解码完的字节
    private Buffer cumulation;
	// 是否每次只解码一条消息,默认关闭以提升性能,开启后适合协议升级场景
    private boolean singleDecode;
    // 标记是否是第一次解码调用
    private boolean first;

	// 标记本次 decode() 是否产出并传递了消息,决定是否需要继续读。
    private boolean firedChannelRead;
    // 标记当前的 channelRead 是 decoder 主动触发的,防止重复触发 read()。
    private boolean selfFiredChannelRead;

	// 统计读操作次数
    private int numReads;
    // 包装的上下文对象
    private ByteToMessageDecoderContext context;

    protected ByteToMessageDecoder() {
        this(MERGE_CUMULATOR);
    }

    protected ByteToMessageDecoder(Cumulator cumulator) {
        this.cumulator = requireNonNull(cumulator, "cumulator");
    }

    @Override
    public final boolean isSharable() {
        // Can't be sharable as we keep state.
        return false;
    }

    public void setSingleDecode(boolean singleDecode) {
        this.singleDecode = singleDecode;
    }

    public boolean isSingleDecode() {
        return singleDecode;
    }
    
    protected int actualReadableBytes() {
        return internalBuffer().readableBytes();
    }

    protected Buffer internalBuffer() {
        return cumulation;
    }

    @Override
    public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        context = new ByteToMessageDecoderContext(ctx);
        handlerAdded0(context);
    }

    protected void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Buffer buf = cumulation;
        if (buf != null) {
            cumulation = null;
            numReads = 0;
            int readable = buf.readableBytes();
            if (readable > 0) {
                ctx.fireChannelRead(buf);
                ctx.fireChannelReadComplete();
            } else {
                buf.close();
            }
        }
        handlerRemoved0(context);
    }

    protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }

	// 1. TCP 是流式协议,发送方即便发送了一整条消息,接收方可能会分多次读取数据
	// 2. 每次读取都触发channelRead,读取到的msg是Buffer类型,内容可能不完整,需要累积合并到 cumulation
	// 3. 每次读取都会触发callDecode,去尝试根据现有的数据,进行解码,如果成功则向调用链传递结果,否则啥也不干,等待下次 channelRead
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof Buffer) {
            selfFiredChannelRead = true;
            try {
                Buffer data = (Buffer) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    cumulation = cumulator.cumulate(ctx.bufferAllocator(), cumulation, data);
                }
                assert context.delegatingCtx() == ctx || ctx == context;
				// 尝试解码
                callDecode(context, cumulation);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
            	// 如果数据已读尽且无剩余,释放 buffer
                if (cumulation != null && cumulation.readableBytes() == 0) {
                    numReads = 0;
                    if (cumulation.isAccessible()) {
                        cumulation.close();
                    }
                    cumulation = null;
                } else if (++numReads >= discardAfterReads) {
                	// 如果累计读取次数到达阈值,主动丢弃已消费字节,防止 cumulation 越来越大
                    numReads = 0;
                    discardSomeReadBytes();
                }

				// 跟踪本次读数据是否至少成功解码过一次, 向下游传播了消息
                firedChannelRead |= context.fireChannelReadCallCount() > 0;
                // 状态清理
                context.reset();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

	// 在 channelRead 之后,执行 channelReadComplete
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    	// 重置读取次数,清理 cumulation 的无效字节。
        numReads = 0;
        discardSomeReadBytes();
        // 解码还未成功过,自动读取关闭,在 channelRead完成后,执行本方法时,手动触发下一次读取
        if (selfFiredChannelRead && !firedChannelRead && !ctx.channel().getOption(ChannelOption.AUTO_READ)) {
            ctx.read(); 
        }
        // 重置状态标志位并通知下游 handler。
        firedChannelRead = false;
        selfFiredChannelRead = false;
        ctx.fireChannelReadComplete();
    }

    protected final void discardSomeReadBytes() {
    	// 丢弃 cumulation 中已消费的字节。只在存在历史累积(!first)时执行
        if (cumulation != null && !first) {
            cumulator.discardSomeReadBytes(cumulation);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        assert context.delegatingCtx() == ctx || ctx == context;
        channelInputClosed(context, true);
    }

    @Override
    public void channelShutdown(ChannelHandlerContext ctx, ChannelShutdownDirection direction) throws Exception {
        ctx.fireChannelShutdown(direction);
        if (direction == ChannelShutdownDirection.Inbound) {
            assert context.delegatingCtx() == ctx || ctx == context;
            channelInputClosed(context, false);
        }
    }

	// Channel 被关闭时,尝试从 cumulation 中解码剩余数据,释放资源,并向下传递 fire* 事件
    private void channelInputClosed(ByteToMessageDecoderContext ctx, boolean callChannelInactive) {
        try {
            channelInputClosed(ctx);
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            if (cumulation != null) {
                cumulation.close();
                cumulation = null;
            }
            if (ctx.fireChannelReadCallCount() > 0) {
                ctx.reset();
                ctx.fireChannelReadComplete();
            }
            if (callChannelInactive) {
                ctx.fireChannelInactive();
            }
        }
    }

    void channelInputClosed(ByteToMessageDecoderContext ctx) throws Exception {
        if (cumulation != null) {
            callDecode(ctx, cumulation);
            if (!ctx.isRemoved()) {
                if (cumulation == null) {
                    try (Buffer buffer = ctx.bufferAllocator().allocate(0)) {
                        decodeLast(ctx, buffer);
                    }
                } else {
                    decodeLast(ctx, cumulation);
                }
            }
        } else {
            try (Buffer buffer = ctx.bufferAllocator().allocate(0)) {
                decodeLast(ctx, buffer);
            }
        }
    }

    void callDecode(ByteToMessageDecoderContext ctx, Buffer in) {
        try {
        	// 当仍有未解码的数据 (in.readableBytes() > 0) 且当前 Handler 还在 pipeline 中(未被移除),就继续调用 decode() 试图解码更多消息。
            while (in.readableBytes() > 0 && !ctx.isRemoved()) {

                int oldInputLength = in.readableBytes();
                int numReadCalled = ctx.fireChannelReadCallCount();
                decodeRemovalReentryProtection(ctx, in);
                if (ctx.isRemoved()) {
                    break;
                }

                if (numReadCalled == ctx.fireChannelReadCallCount()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }

                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(StringUtil.simpleClassName(getClass()) +".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }

    protected void decodeLast(ChannelHandlerContext ctx, Buffer in) throws Exception {
        if (in.readableBytes() > 0) {
            decodeRemovalReentryProtection(ctx, in);
        }
    }
    
    final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, Buffer in) throws Exception {
        decode(ctx, in);
    }
    
    // 建议结合 FixedLengthFrameDecoder阅读,理解decode方法做了什么
    protected abstract void decode(ChannelHandlerContext ctx, Buffer in) throws Exception;
    
	// ...
}

ByteToMessageDecoderContext

ByteToMessageDecoderContext 是 Netty 为解码器设计的包装上下文,用于统计 fireChannelRead 次数,以精确控制解码行为和数据流转。

static final class ByteToMessageDecoderContext extends DelegatingChannelHandlerContext {
    private int fireChannelReadCalled;

    private ByteToMessageDecoderContext(ChannelHandlerContext ctx) {
        super(ctx);
    }

    void reset() {
        fireChannelReadCalled = 0;
    }

    int fireChannelReadCallCount() {
        return fireChannelReadCalled;
    }

    @Override
    public ChannelHandlerContext fireChannelRead(Object msg) {
        fireChannelReadCalled ++;
        super.fireChannelRead(msg);
        return this;
    }
}

Cumulator

Cumulator 接口用于处理和累积多个 Buffer 数据。它有两个主要方法,分别负责将多个 Buffer 合并成一个更大的 Buffer,并管理已经处理过的数据。

public interface Cumulator {
	// BufferAllocator alloc:缓冲区分配器,用于分配新的 Buffer
	// Buffer cumulation:当前的累积数据缓冲区
	// Buffer in:新的输入数据缓冲区
	// 将多个 Buffer 合并成一个新的 Buffer,即将当前 Buffer 中的可读数据与新的数据合并
    Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in);
    // 丢弃缓冲区中已经读取的数据,返回一个新的缓冲区,去除了之前已处理过的部分
    Buffer discardSomeReadBytes(Buffer cumulation);
}
CompositeBufferCumulator

CompositeBufferCumulator 适用于大量 Buffer 分段输入时,通过复合缓冲区高效拼接,避免数据拷贝和缓冲区重分配。

private static final class CompositeBufferCumulator implements Cumulator {
    @Override
    public Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in) {
    	// 如果累加缓冲区 cumulation 没有可读字节了,直接释放并返回输入缓冲区 in
        if (cumulation.readableBytes() == 0) {
            cumulation.close();
            return in;
        }
        try (in) {
        	// 输入缓冲区 in 没有可读字节,保留原来的 cumulation。
            if (in.readableBytes() == 0) {
                return cumulation;
            }
            
			// 如果累加缓冲区是只读的,复制出一个可写副本
            if (cumulation.readOnly()) {
                Buffer tmp = cumulation.copy();
                cumulation.close();
                cumulation = tmp;
            }
            // 如果当前累加区已经是 CompositeBuffer,直接扩展进去
            if (CompositeBuffer.isComposite(cumulation)) {
                CompositeBuffer composite = (CompositeBuffer) cumulation;
                composite.extendWith(prepareInForCompose(in));
                return composite;
            }
            return alloc.compose(Arrays.asList(cumulation.send(), prepareInForCompose(in)));
        }
    }

	// 确保 in 是只读安全的后,调用 send() 移交所有权用于合并。
    private static Send<Buffer> prepareInForCompose(Buffer in) {
        return in.readOnly() ? in.copy().send() : in.send();
    }

	// 通过 readSplit(0) 剪掉已经读过的部分,释放空间。相比 compact() 更适合复合缓冲区(CompositeBuffer)
    @Override
    public Buffer discardSomeReadBytes(Buffer cumulation) {
        cumulation.readSplit(0).close();
        return cumulation;
    }

    @Override
    public String toString() {
        return "CompositeBufferCumulator";
    }
}
MergeCumulator

MergeCumulator 更适合频繁接收小块数据的场景,追求高访问性能和代码简单;但当数据量变大时,可能因为频繁扩容带来复制开销。

    private static final class MergeCumulator implements Cumulator {
        @Override
        public Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in) {
        	// 如果当前累积的 Buffer 为空
            if (cumulation.readableBytes() == 0) {
                cumulation.close();
                return in;
            }

            try (in) {
                final int required = in.readableBytes();
                // 如果当前的累积 Buffer 没有足够的空间,或者它是只读,就扩展大小。
                if (required > cumulation.writableBytes() || cumulation.readOnly()) {
                    return expandCumulationAndWrite(alloc, cumulation, in);
                }
                cumulation.writeBytes(in);
                return cumulation;
            }
        }

		// 如果 cumulation 中的已读字节数超过可写字节数(readerOffset() > writableBytes()),则调用 compact() 方法来压缩缓冲区,移除已读的数据
        @Override
        public Buffer discardSomeReadBytes(Buffer cumulation) {
            if (cumulation.readerOffset() > cumulation.writableBytes()) {
                cumulation.compact();
            }
            return cumulation;
        }


        private static Buffer expandCumulationAndWrite(BufferAllocator alloc, Buffer oldCumulation, Buffer in) {
        	// 1. 计算新的 Buffer 大小:计算新的 Buffer 大小,确保它能够容纳当前的 cumulation 和输入的 Buffer。新的大小是当前已读字节和输入字节总和的下一个最接近的 2 的幂次方。
            final int newSize = safeFindNextPositivePowerOfTwo(oldCumulation.readableBytes() + in.readableBytes());
            // 创建新的 Buffer:根据是否是只读的 Buffer 来决定如何创建新的 Buffer。如果是只读的,会重新分配内存;否则,直接扩展当前的 Buffer。
            Buffer newCumulation = oldCumulation.readOnly() ? alloc.allocate(newSize) :
                    oldCumulation.ensureWritable(newSize);
			// 3. 将旧数据和新数据写入新 Buffer:如果创建了新的 Buffer,将旧的 cumulation 和输入的 Buffer 都写入新的 Buffer。
            try {
                if (newCumulation != oldCumulation) {
                    newCumulation.writeBytes(oldCumulation);
                }
                newCumulation.writeBytes(in);
                return newCumulation;
            } finally {
            	// 4. 关闭旧的 Buffer:如果创建了新的 Buffer,则关闭原先的 cumulation,释放内存。
                if (newCumulation != oldCumulation) {
                    oldCumulation.close();
                }
            }
        }

        @Override
        public String toString() {
            return "MergeCumulator";
        }
    }
}

FixedLengthFrameDecoder

在此引入这个类,只是因为这个类最简单,方便大家理解 解码器的工作流程。

  1. FixedLengthFrameDecoder:按照固定的字节长度切割每一帧,适用于每条消息长度一致的协议(如定长二进制协议)。
  2. LengthFieldBasedFrameDecoder:根据消息中指定位置的“长度字段”值来动态切割完整帧,适用于二进制协议。
  3. LineBasedFrameDecoder:以 \n 或 \r\n 为分隔符,将每行作为一帧,适用于基于文本的行协议。
  4. DelimiterBasedFrameDecoder:使用自定义的分隔符(如 $_、#END#)拆分帧,适用于定界符结束的文本协议。
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {

    private final int frameLength;

    public FixedLengthFrameDecoder(int frameLength) {
        checkPositive(frameLength, "frameLength");
        this.frameLength = frameLength;
    }

    public FixedLengthFrameDecoder(int frameLength, Cumulator cumulator) {
        super(cumulator);
        checkPositive(frameLength, "frameLength");
        this.frameLength = frameLength;
    }

	// ByteToMessageDecoder(channelRead[首次读] -> callDecode[循环读取数据]
	// ->
	// decodeRemovalReentryProtection) 
	// -> 
	// FixedLengthFrameDecoder.decode
    @Override
    protected final void decode(ChannelHandlerContext ctx, Buffer in) throws Exception {
        Object decoded = decode0(ctx, in);
        if (decoded != null) {
             // 只要解码成功,就向下传递结果
        	// ByteToMessageDecoderContext.fireChannelRead
            ctx.fireChannelRead(decoded);
        }
    }
    
    protected Object decode0(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, Buffer in) throws Exception {
        if (in.readableBytes() < frameLength) {
            return null;
        } else {
            return in.readSplit(frameLength);
        }
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到