[netty5: MessageAggregator & HttpObjectAggregator]-源码解析

发布于:2025-07-07 ⋅ 阅读:(19) ⋅ 点赞:(0)

在阅读这篇文章前,推荐先阅读

  1. [netty5: ByteToMessageCodec & MessageToByteEncoder & ByteToMessageDecoder]-源码分析
  2. [netty5: HttpObject]-源码解析

100-continue

100-continue 是 HTTP/1.1 协议中的一种机制,用于客户端在发送大体积请求体(如文件上传)前,先向服务器发送一个带有 Expect: 100-continue 头的请求,询问服务器是否准备好接收请求体;服务器如果准备好了,会返回 100 Continue 响应,客户端才开始发送实际数据,从而避免不必要的大数据传输。

MessageAggregator

MessageAggregator<I, S, C, A> 是一个高度可复用的通用聚合器框架,适用于各种流式协议的“分段消息聚合”场景:

功能 说明
启动聚合 tryStartMessage() 检测到起始消息,初始化聚合
聚合中 tryContentMessage() 检测到内容消息,append 内容
完成聚合 isLastContentMessage() 判断是否是最后一块,调用 finishAggregation
超长控制 maxContentLength + lengthForContent 控制体积
特殊控制 支持 100-continue、异常处理、复用 context listener
public abstract class MessageAggregator<I, S, C extends AutoCloseable, A extends AutoCloseable> extends MessageToMessageDecoder<I> {

	// 当前正在聚合的完整消息(例如 FullHttpRequest 或 FullHttpResponse)
    private A currentMessage;

	// 聚合内容允许的最大字节数,超过则触发 handleOversizedMessage
    private final int maxContentLength;
    // 标识当前是否正在处理超长消息,避免重复处理。
    private boolean handlingOversizedMessage;
    
    private ChannelHandlerContext ctx;
	// 用于监听 100-Continue 响应写入完成后的回调
    private FutureContextListener<ChannelHandlerContext, Void> continueResponseWriteListener;
	// 标识是否正在聚合过程中
    private boolean aggregating;
    // 是否在通道关闭时处理未完成的聚合
    private boolean handleIncompleteAggregateDuringClose = true;

    protected MessageAggregator(int maxContentLength) {
        this.maxContentLength = maxContentLength;
    }
    
    @Override
    public boolean acceptInboundMessage(Object msg) throws Exception {
        if (!super.acceptInboundMessage(msg)) {
            return false;
        }

        if (isAggregated(msg)) {
            return false;
        }
        
        if (tryStartMessage(msg) != null) {
            return true;
        }
        return aggregating && tryContentMessage(msg) != null;
    }
    
    @Override
    protected void decode(final ChannelHandlerContext ctx, I msg) throws Exception {
		// 1. 判断是否为新消息起始(startMsg)
    	// 判断当前收到的 msg 是否为 HttpMessage,如果是,则开始处理聚合。
        final S startMsg = tryStartMessage(msg);
        if (startMsg != null) {
            aggregating = true;
            handlingOversizedMessage = false;
            // 如果已存在未完成的 currentMessage,说明消息异常,抛出 MessageAggregationException
            if (currentMessage != null) {
                currentMessage.close();
                currentMessage = null;
                throw new MessageAggregationException();
            }
            
            // 2. 处理 100-continue 相关响应(continueResponse)

			// newContinueResponse 的核心逻辑:
			// - 如果请求头中包含 Expect: 100-continue,并且请求体大小没有超过 maxContentLength,则返回一个 100 Continue 响应;
			// - 如果请求体过大(Content-Length > maxContentLength),则返回一个 413 Request Entity Too Large 错误响应;
			// - 如果不符合任何条件,返回 null,表示不需要继续响应。
            Object continueResponse = newContinueResponse(startMsg, maxContentLength, ctx.pipeline());
            if (continueResponse != null) {
            	// 构造监听器
                FutureContextListener<ChannelHandlerContext, Void> listener = continueResponseWriteListener;
                if (listener == null) {
                    continueResponseWriteListener = listener = (context, future) -> {
                        if (future.isFailed()) {
                            context.fireChannelExceptionCaught(future.cause());
                        }
                    };
                }

				// 判断在收到 100-continue 响应后是否关闭连接,条件是配置了关闭标志且响应表示应忽略后续内容。
                boolean closeAfterWrite = closeAfterContinueResponse(continueResponse);
                // 	// 判断响应是否为客户端错误(4xx),如果是,则说明应忽略后续内容。
                handlingOversizedMessage = ignoreContentAfterContinueResponse(continueResponse);

				// 写出响应并监听结果
                Future<Void> future = ctx.writeAndFlush(continueResponse).addListener(ctx, listener);

                if (closeAfterWrite) {
                    handleIncompleteAggregateDuringClose = false;
                    future.addListener(ctx, ChannelFutureListeners.CLOSE);
                    return;
                }
                // 判断是否忽略后续内容
                if (handlingOversizedMessage) {
                    return;
                }
            } else if (isContentLengthInvalid(startMsg, maxContentLength)) {
            	// 3. 检查请求长度是否合法
                invokeHandleOversizedMessage(ctx, startMsg);
                return;
            }

			// 4. 处理起始消息中已有的解码失败情况
            if (startMsg instanceof DecoderResultProvider &&
                    !((DecoderResultProvider) startMsg).decoderResult().isSuccess()) {
                final A aggregated = beginAggregation(ctx.bufferAllocator(), startMsg);
                finishAggregation(ctx.bufferAllocator(), aggregated);
                ctx.fireChannelRead(aggregated);
                return;
            }

			// 5. 初始化新消息聚合对象
			// 创建一个聚合消息实例(包含起始行和一个空的内容缓冲区),等待后续内容片段加入。
            currentMessage = beginAggregation(ctx.bufferAllocator(), startMsg);
            return;
        }

		// 6. 处理内容消息(contentMsg)

		// 先判断 msg 是否是消息体片段,如果不是则抛异常。
        final C contentMsg = tryContentMessage(msg);
        if (contentMsg != null) {
        	// 如果还没有初始化的聚合消息(即没有起始消息),忽略该内容。
            if (currentMessage == null) {
                return;
            }
			// 检查聚合后长度是否超限,超限则调用超长消息处理。
            if (lengthForAggregation(currentMessage) > maxContentLength - lengthForContent(contentMsg)) {
                invokeHandleOversizedMessage(ctx, currentMessage);
                return;
            }

			// 调用 aggregate 将当前内容片段追加到聚合消息。
            aggregate(ctx.bufferAllocator(), currentMessage, contentMsg);

            final boolean last;
			// 检查是否为消息最后一片(last)
            if (contentMsg instanceof DecoderResultProvider) {
                DecoderResult decoderResult = ((DecoderResultProvider) contentMsg).decoderResult();
                if (!decoderResult.isSuccess()) {
                    if (currentMessage instanceof DecoderResultProvider) {
                        ((DecoderResultProvider) currentMessage).setDecoderResult(DecoderResult.failure(decoderResult.cause()));
                    }
                    last = true;
                } else {
                    last = isLastContentMessage(contentMsg);
                }
            } else {
                last = isLastContentMessage(contentMsg);
            }

			// 如果是,完成聚合,清理状态,向下游传递完整消息。
            if (last) {
                finishAggregation0(ctx.bufferAllocator(), currentMessage);

                // All done
                A message = currentMessage;
                currentMessage = null;
                ctx.fireChannelRead(message);
            }
        } else {
            throw new MessageAggregationException();
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        if (currentMessage != null && !ctx.channel().getOption(ChannelOption.AUTO_READ)) {
            ctx.read();
        }
        ctx.fireChannelReadComplete();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (aggregating && handleIncompleteAggregateDuringClose) {
            ctx.fireChannelExceptionCaught(new PrematureChannelClosureException("Channel closed while still aggregating message"));
        }
        try {
            super.channelInactive(ctx);
        } finally {
            releaseCurrentMessage();
        }
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        this.ctx = ctx;
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        try {
            super.handlerRemoved(ctx);
        } finally {
            releaseCurrentMessage();
        }
    }

    protected final void releaseCurrentMessage() throws Exception {
        if (currentMessage != null) {
            currentMessage.close();
            currentMessage = null;
        }
        handlingOversizedMessage = false;
        aggregating = false;
    }

	// 省略抽象方法,具体看 HttpObjectAggregator
}

HttpObjectAggregator

HttpObjectAggregator 是构建高层 HTTP 服务的基础设施组件,它将分块的 HTTP 请求或响应组装为完整对象,从而简化上层应用逻辑。其设计清晰、可扩展性强,并充分考虑了 Expect: 100-continue 与 Content-Length 异常等 HTTP 协议边界情况,是非常值得借鉴的聚合处理器实现。

public class HttpObjectAggregator<C extends HttpContent<C>>
        extends MessageAggregator<HttpObject, HttpMessage, HttpContent<C>, FullHttpMessage<?>> {
    private static final Logger logger = LoggerFactory.getLogger(HttpObjectAggregator.class);

	// 当检测到客户端发送了 100-continue 期望但请求内容过大时,是否关闭连接;
	// 为 true 则直接关闭连接,防止浪费资源,
	// 为 false 则保持连接打开并继续读取和丢弃数据直到下一请求。
    private final boolean closeOnExpectationFailed;

    public HttpObjectAggregator(int maxContentLength) {
        this(maxContentLength, false);
    }

    public HttpObjectAggregator(int maxContentLength, boolean closeOnExpectationFailed) {
        super(maxContentLength);
        this.closeOnExpectationFailed = closeOnExpectationFailed;
    }

    @Override
    protected HttpMessage tryStartMessage(Object msg) {
        return msg instanceof HttpMessage ? (HttpMessage) msg : null;
    }

    @SuppressWarnings("unchecked")
    @Override
    protected HttpContent<C> tryContentMessage(Object msg) {
        return msg instanceof HttpContent ? (HttpContent<C>) msg : null;
    }

    @Override
    protected boolean isAggregated(Object msg) throws Exception {
        return msg instanceof FullHttpMessage;
    }

    @Override
    protected int lengthForContent(HttpContent<C> msg) {
        return msg.payload().readableBytes();
    }

    @Override
    protected int lengthForAggregation(FullHttpMessage<?> msg) {
        return msg.payload().readableBytes();
    }

    @Override
    protected boolean isLastContentMessage(HttpContent<C> msg) throws Exception {
        return msg instanceof LastHttpContent;
    }

    @Override
    protected boolean isContentLengthInvalid(HttpMessage start, int maxContentLength) {
        try {
            return getContentLength(start, -1L) > maxContentLength;
        } catch (final NumberFormatException e) {
            return false;
        }
    }

	// 根据请求的 Expectation 头判断是否返回 100 Continue 或错误响应(如 417 或 413),并在不支持或内容过大时触发相应事件
    private static FullHttpResponse continueResponse(HttpMessage start, int maxContentLength, ChannelPipeline pipeline) {
    	// 根据请求的 Expectation 头判断是否支持期望,若不支持触发失败事件返回 417;
        if (HttpUtil.isUnsupportedExpectation(start)) {
            pipeline.fireChannelInboundEvent(HttpExpectationFailedEvent.INSTANCE);
            return newErrorResponse(EXPECTATION_FAILED, pipeline.channel().bufferAllocator(), true, false);
        }
        if (HttpUtil.is100ContinueExpected(start)) {
            // 若期望 100-continue 且内容长度未超限,返回 100 Continue 响应,
            if (getContentLength(start, -1L) <= maxContentLength) {
                return newErrorResponse(CONTINUE, pipeline.channel().bufferAllocator(), false, false);
            }
            // 否则触发失败事件并返回 413 请求体过大响应。
            pipeline.fireChannelInboundEvent(HttpExpectationFailedEvent.INSTANCE);
            return newErrorResponse(REQUEST_ENTITY_TOO_LARGE, pipeline.channel().bufferAllocator(), true, false);
        }

        return null;
    }

	// 根据请求创建一个 100-continue 响应,并在响应生成后移除请求中的 Expect 头。
    @Override
    protected Object newContinueResponse(HttpMessage start, int maxContentLength, ChannelPipeline pipeline) {
        FullHttpResponse response = continueResponse(start, maxContentLength, pipeline);
        if (response != null) {
            start.headers().remove(EXPECT);
        }
        return response;
    }

	// 判断在收到 100-continue 响应后是否关闭连接,条件是配置了关闭标志且响应表示应忽略后续内容。
    @Override
    protected boolean closeAfterContinueResponse(Object msg) {
        return closeOnExpectationFailed && ignoreContentAfterContinueResponse(msg);
    }

	// 判断响应是否为客户端错误(4xx),如果是,则说明应忽略后续内容。
    @Override
    protected boolean ignoreContentAfterContinueResponse(Object msg) {
        if (msg instanceof HttpResponse) {
            final HttpResponse httpResponse = (HttpResponse) msg;
            return httpResponse.status().codeClass() == HttpStatusClass.CLIENT_ERROR;
        }
        return false;
    }

	// 开始对一个非完整的 HTTP 消息进行聚合,移除分块传输编码标记,并创建一个对应请求或响应类型的空聚合消息,准备接收后续内容。
    @Override
    protected FullHttpMessage<?> beginAggregation(BufferAllocator allocator, HttpMessage start) throws Exception {
        assert !(start instanceof FullHttpMessage);

		// 移除 HTTP 消息头中的 Transfer-Encoding: chunked,以便后续使用聚合后的 Content-Length
        HttpUtil.setTransferEncodingChunked(start, false);

        final CompositeBuffer content = allocator.compose();
        FullHttpMessage<?> ret;
        
        // 根据消息类型创建对应的聚合消息并初始化其 payload 为一个可扩展的空 CompositeBuffer,用于后续追加内容块。
        if (start instanceof HttpRequest) {
            ret = new AggregatedFullHttpRequest((HttpRequest) start, content, null);
        } else if (start instanceof HttpResponse) {
            ret = new AggregatedFullHttpResponse((HttpResponse) start, content, null);
        } else {
            throw new Error();
        }
        return ret;
    }

	// 将内容块追加到聚合消息中,并在遇到最后一块时设置尾部头信息
    @Override
    protected void aggregate(BufferAllocator allocator, FullHttpMessage<?> aggregated, HttpContent<C> content) throws Exception {
        final CompositeBuffer payload = (CompositeBuffer) aggregated.payload();
        payload.extendWith(content.payload().send());
        if (content instanceof LastHttpContent) {
            ((AggregatedFullHttpMessage<?>) aggregated).setTrailingHeaders(((LastHttpContent<?>) content).trailingHeaders());
        }
    }

	// 完成聚合时,如果未设置 Content-Length,则自动设置为聚合内容的实际长度。
    @Override
    protected void finishAggregation(BufferAllocator allocator, FullHttpMessage<?> aggregated) throws Exception {
        if (!HttpUtil.isContentLengthSet(aggregated)) {
            aggregated.headers().set(CONTENT_LENGTH, String.valueOf(aggregated.payload().readableBytes()));
        }
    }

	// 处理超大 HTTP 消息
    @Override
    protected void handleOversizedMessage(final ChannelHandlerContext ctx, Object oversized) throws Exception {
        if (oversized instanceof HttpRequest) {
            HttpRequest request = (HttpRequest) oversized;
            // 条件1:如果是完整请求(FullHttpMessage)或者请求既不期待100-continue也不保持连接
            if (oversized instanceof FullHttpMessage || !HttpUtil.is100ContinueExpected(request) && !HttpUtil.isKeepAlive(request)) {
            	// 发送带关闭连接指示的413错误响应
                Future<Void> future = ctx.writeAndFlush(newErrorResponse(REQUEST_ENTITY_TOO_LARGE, ctx.bufferAllocator(), true, true));
                future.addListener(f -> {
                    if (f.isFailed()) {
                    	// 日志打印发送失败的原因
                        logger.debug("Failed to send a 413 Request Entity Too Large.", f.cause());
                    }
                     // 响应发送后关闭连接
                    ctx.close();
                });
            } else {
            	// 条件2:请求期待100-continue或者保持连接时,发送不关闭连接的413响应
                ctx.writeAndFlush(newErrorResponse(REQUEST_ENTITY_TOO_LARGE, ctx.bufferAllocator(), true, false))
                        .addListener(future -> {
                            if (future.isFailed()) {
                            	// 发送失败时日志记录并关闭连接
                                logger.debug("Failed to send a 413 Request Entity Too Large.", future.cause());
                                ctx.close();
                            }
                        });
            }
        } else if (oversized instanceof HttpResponse) {
             // 如果是超大的响应,直接抛出异常,可能交由上层处理
            throw new ResponseTooLargeException("Response entity too large: " + oversized);
        } else {
            // 既不是请求也不是响应,视为非法状态,抛异常
            throw new IllegalStateException();
        }
    }

    @Override
    public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.channelExceptionCaught(ctx, cause);
        if (cause instanceof ResponseTooLargeException) {
            ctx.close();
        }
    }

	// 该方法用于生成一个指定状态码的空响应,并根据参数决定是否关闭连接和设置内容长度
    private static FullHttpResponse newErrorResponse(HttpResponseStatus status, BufferAllocator allocator, boolean emptyContent, boolean closeConnection) {
    	// 根据传入的状态码 status,创建一个空内容的 FullHttpResponse,
        FullHttpResponse resp = new DefaultFullHttpResponse(HTTP_1_1, status, allocator.allocate(0));
        // 如果 emptyContent 为 true,则设置响应头 Content-Length 为 0,
        if (emptyContent) {
            resp.headers().set(CONTENT_LENGTH, HttpHeaderValues.ZERO);
        }
        // 如果 closeConnection 为 true,则设置响应头 Connection: close,表示连接关闭。
        if (closeConnection) {
            resp.headers().set(CONNECTION, HttpHeaderValues.CLOSE);
        }
        return resp;
    }
}

AggregatedFullHttpMessage

private abstract static class AggregatedFullHttpMessage<R extends FullHttpMessage<R>> implements FullHttpMessage<R> {
    protected final HttpMessage message;
    private final Buffer payload;
    private HttpHeaders trailingHeaders;

    AggregatedFullHttpMessage(HttpMessage message, Buffer payload, HttpHeaders trailingHeaders) {
        this.message = message;
        this.payload = payload;
        this.trailingHeaders = trailingHeaders;
    }

    @Override
    public void close() {
        payload.close();
    }

    @Override
    public boolean isAccessible() {
        return payload.isAccessible();
    }

    @Override
    public Buffer payload() {
        return payload;
    }

    @Override
    public HttpHeaders trailingHeaders() {
        HttpHeaders trailingHeaders = this.trailingHeaders;
        return requireNonNullElse(trailingHeaders, HttpHeaders.emptyHeaders());
    }

    void setTrailingHeaders(HttpHeaders trailingHeaders) {
        this.trailingHeaders = trailingHeaders;
    }

    @Override
    public HttpVersion getProtocolVersion() {
        return message.protocolVersion();
    }

    @Override
    public HttpVersion protocolVersion() {
        return message.protocolVersion();
    }

    @Override
    public FullHttpMessage<R> setProtocolVersion(HttpVersion version) {
        message.setProtocolVersion(version);
        return this;
    }

    @Override
    public HttpHeaders headers() {
        return message.headers();
    }

    @Override
    public DecoderResult decoderResult() {
        return message.decoderResult();
    }

    @Override
    public void setDecoderResult(DecoderResult result) {
        message.setDecoderResult(result);
    }
}

AggregatedFullHttpRequest

private static final class AggregatedFullHttpRequest extends AggregatedFullHttpMessage<FullHttpRequest> implements FullHttpRequest {

    AggregatedFullHttpRequest(HttpRequest request, Buffer content, HttpHeaders trailingHeaders) {
        super(request, content, trailingHeaders);
    }

    @Override
    public Send<FullHttpRequest> send() {
        return payload().send().map(FullHttpRequest.class,
                p -> new AggregatedFullHttpRequest(this, p, trailingHeaders()));
    }

    @Override
    public AggregatedFullHttpRequest copy() {
        return new AggregatedFullHttpRequest(this, payload().copy(), trailingHeaders().copy());
    }

    @Override
    public FullHttpRequest touch(Object hint) {
        payload().touch(hint);
        return this;
    }

    @Override
    public FullHttpRequest setMethod(HttpMethod method) {
        ((HttpRequest) message).setMethod(method);
        return this;
    }

    @Override
    public FullHttpRequest setUri(String uri) {
        ((HttpRequest) message).setUri(uri);
        return this;
    }

    @Override
    public HttpMethod method() {
        return ((HttpRequest) message).method();
    }

    @Override
    public String uri() {
        return ((HttpRequest) message).uri();
    }

    @Override
    public FullHttpRequest setProtocolVersion(HttpVersion version) {
        super.setProtocolVersion(version);
        return this;
    }

    @Override
    public String toString() {
        return HttpMessageUtil.appendFullRequest(new StringBuilder(256), this).toString();
    }
}

AggregatedFullHttpResponse

private static final class AggregatedFullHttpResponse extends AggregatedFullHttpMessage<FullHttpResponse> implements FullHttpResponse {

    AggregatedFullHttpResponse(HttpResponse message, Buffer content, HttpHeaders trailingHeaders) {
        super(message, content, trailingHeaders);
    }

    @Override
    public Send<FullHttpResponse> send() {
        return payload().send().map(FullHttpResponse.class,
                p -> new AggregatedFullHttpResponse(this, p, trailingHeaders()));
    }

    @Override
    public AggregatedFullHttpResponse copy() {
        return new AggregatedFullHttpResponse(this, payload().copy(), trailingHeaders().copy());
    }

    @Override
    public FullHttpResponse touch(Object hint) {
        payload().touch(hint);
        return this;
    }

    @Override
    public FullHttpResponse setStatus(HttpResponseStatus status) {
        ((HttpResponse) message).setStatus(status);
        return this;
    }

    @Override
    public HttpResponseStatus status() {
        return ((HttpResponse) message).status();
    }

    @Override
    public FullHttpResponse setProtocolVersion(HttpVersion version) {
        super.setProtocolVersion(version);
        return this;
    }

    @Override
    public String toString() {
        return HttpMessageUtil.appendFullResponse(new StringBuilder(256), this).toString();
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到