在阅读这篇文章前,推荐先阅读
- [netty5: ByteToMessageCodec & MessageToByteEncoder & ByteToMessageDecoder]-源码分析
- [netty5: HttpObject]-源码解析
100-continue
100-continue 是 HTTP/1.1 协议中的一种机制,用于客户端在发送大体积请求体(如文件上传)前,先向服务器发送一个带有 Expect: 100-continue 头的请求,询问服务器是否准备好接收请求体;服务器如果准备好了,会返回 100 Continue 响应,客户端才开始发送实际数据,从而避免不必要的大数据传输。
MessageAggregator
MessageAggregator<I, S, C, A> 是一个高度可复用的通用聚合器框架,适用于各种流式协议的“分段消息聚合”场景:
功能 | 说明 |
---|---|
启动聚合 | tryStartMessage() 检测到起始消息,初始化聚合 |
聚合中 | tryContentMessage() 检测到内容消息,append 内容 |
完成聚合 | isLastContentMessage() 判断是否是最后一块,调用 finishAggregation |
超长控制 | maxContentLength + lengthForContent 控制体积 |
特殊控制 | 支持 100-continue、异常处理、复用 context listener |
public abstract class MessageAggregator<I, S, C extends AutoCloseable, A extends AutoCloseable> extends MessageToMessageDecoder<I> {
// 当前正在聚合的完整消息(例如 FullHttpRequest 或 FullHttpResponse)
private A currentMessage;
// 聚合内容允许的最大字节数,超过则触发 handleOversizedMessage
private final int maxContentLength;
// 标识当前是否正在处理超长消息,避免重复处理。
private boolean handlingOversizedMessage;
private ChannelHandlerContext ctx;
// 用于监听 100-Continue 响应写入完成后的回调
private FutureContextListener<ChannelHandlerContext, Void> continueResponseWriteListener;
// 标识是否正在聚合过程中
private boolean aggregating;
// 是否在通道关闭时处理未完成的聚合
private boolean handleIncompleteAggregateDuringClose = true;
protected MessageAggregator(int maxContentLength) {
this.maxContentLength = maxContentLength;
}
@Override
public boolean acceptInboundMessage(Object msg) throws Exception {
if (!super.acceptInboundMessage(msg)) {
return false;
}
if (isAggregated(msg)) {
return false;
}
if (tryStartMessage(msg) != null) {
return true;
}
return aggregating && tryContentMessage(msg) != null;
}
@Override
protected void decode(final ChannelHandlerContext ctx, I msg) throws Exception {
// 1. 判断是否为新消息起始(startMsg)
// 判断当前收到的 msg 是否为 HttpMessage,如果是,则开始处理聚合。
final S startMsg = tryStartMessage(msg);
if (startMsg != null) {
aggregating = true;
handlingOversizedMessage = false;
// 如果已存在未完成的 currentMessage,说明消息异常,抛出 MessageAggregationException
if (currentMessage != null) {
currentMessage.close();
currentMessage = null;
throw new MessageAggregationException();
}
// 2. 处理 100-continue 相关响应(continueResponse)
// newContinueResponse 的核心逻辑:
// - 如果请求头中包含 Expect: 100-continue,并且请求体大小没有超过 maxContentLength,则返回一个 100 Continue 响应;
// - 如果请求体过大(Content-Length > maxContentLength),则返回一个 413 Request Entity Too Large 错误响应;
// - 如果不符合任何条件,返回 null,表示不需要继续响应。
Object continueResponse = newContinueResponse(startMsg, maxContentLength, ctx.pipeline());
if (continueResponse != null) {
// 构造监听器
FutureContextListener<ChannelHandlerContext, Void> listener = continueResponseWriteListener;
if (listener == null) {
continueResponseWriteListener = listener = (context, future) -> {
if (future.isFailed()) {
context.fireChannelExceptionCaught(future.cause());
}
};
}
// 判断在收到 100-continue 响应后是否关闭连接,条件是配置了关闭标志且响应表示应忽略后续内容。
boolean closeAfterWrite = closeAfterContinueResponse(continueResponse);
// // 判断响应是否为客户端错误(4xx),如果是,则说明应忽略后续内容。
handlingOversizedMessage = ignoreContentAfterContinueResponse(continueResponse);
// 写出响应并监听结果
Future<Void> future = ctx.writeAndFlush(continueResponse).addListener(ctx, listener);
if (closeAfterWrite) {
handleIncompleteAggregateDuringClose = false;
future.addListener(ctx, ChannelFutureListeners.CLOSE);
return;
}
// 判断是否忽略后续内容
if (handlingOversizedMessage) {
return;
}
} else if (isContentLengthInvalid(startMsg, maxContentLength)) {
// 3. 检查请求长度是否合法
invokeHandleOversizedMessage(ctx, startMsg);
return;
}
// 4. 处理起始消息中已有的解码失败情况
if (startMsg instanceof DecoderResultProvider &&
!((DecoderResultProvider) startMsg).decoderResult().isSuccess()) {
final A aggregated = beginAggregation(ctx.bufferAllocator(), startMsg);
finishAggregation(ctx.bufferAllocator(), aggregated);
ctx.fireChannelRead(aggregated);
return;
}
// 5. 初始化新消息聚合对象
// 创建一个聚合消息实例(包含起始行和一个空的内容缓冲区),等待后续内容片段加入。
currentMessage = beginAggregation(ctx.bufferAllocator(), startMsg);
return;
}
// 6. 处理内容消息(contentMsg)
// 先判断 msg 是否是消息体片段,如果不是则抛异常。
final C contentMsg = tryContentMessage(msg);
if (contentMsg != null) {
// 如果还没有初始化的聚合消息(即没有起始消息),忽略该内容。
if (currentMessage == null) {
return;
}
// 检查聚合后长度是否超限,超限则调用超长消息处理。
if (lengthForAggregation(currentMessage) > maxContentLength - lengthForContent(contentMsg)) {
invokeHandleOversizedMessage(ctx, currentMessage);
return;
}
// 调用 aggregate 将当前内容片段追加到聚合消息。
aggregate(ctx.bufferAllocator(), currentMessage, contentMsg);
final boolean last;
// 检查是否为消息最后一片(last)
if (contentMsg instanceof DecoderResultProvider) {
DecoderResult decoderResult = ((DecoderResultProvider) contentMsg).decoderResult();
if (!decoderResult.isSuccess()) {
if (currentMessage instanceof DecoderResultProvider) {
((DecoderResultProvider) currentMessage).setDecoderResult(DecoderResult.failure(decoderResult.cause()));
}
last = true;
} else {
last = isLastContentMessage(contentMsg);
}
} else {
last = isLastContentMessage(contentMsg);
}
// 如果是,完成聚合,清理状态,向下游传递完整消息。
if (last) {
finishAggregation0(ctx.bufferAllocator(), currentMessage);
// All done
A message = currentMessage;
currentMessage = null;
ctx.fireChannelRead(message);
}
} else {
throw new MessageAggregationException();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (currentMessage != null && !ctx.channel().getOption(ChannelOption.AUTO_READ)) {
ctx.read();
}
ctx.fireChannelReadComplete();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (aggregating && handleIncompleteAggregateDuringClose) {
ctx.fireChannelExceptionCaught(new PrematureChannelClosureException("Channel closed while still aggregating message"));
}
try {
super.channelInactive(ctx);
} finally {
releaseCurrentMessage();
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
try {
super.handlerRemoved(ctx);
} finally {
releaseCurrentMessage();
}
}
protected final void releaseCurrentMessage() throws Exception {
if (currentMessage != null) {
currentMessage.close();
currentMessage = null;
}
handlingOversizedMessage = false;
aggregating = false;
}
// 省略抽象方法,具体看 HttpObjectAggregator
}
HttpObjectAggregator
HttpObjectAggregator
是构建高层 HTTP 服务的基础设施组件,它将分块的 HTTP 请求或响应组装为完整对象,从而简化上层应用逻辑。其设计清晰、可扩展性强,并充分考虑了 Expect: 100-continue 与 Content-Length 异常等 HTTP 协议边界情况,是非常值得借鉴的聚合处理器实现。
public class HttpObjectAggregator<C extends HttpContent<C>>
extends MessageAggregator<HttpObject, HttpMessage, HttpContent<C>, FullHttpMessage<?>> {
private static final Logger logger = LoggerFactory.getLogger(HttpObjectAggregator.class);
// 当检测到客户端发送了 100-continue 期望但请求内容过大时,是否关闭连接;
// 为 true 则直接关闭连接,防止浪费资源,
// 为 false 则保持连接打开并继续读取和丢弃数据直到下一请求。
private final boolean closeOnExpectationFailed;
public HttpObjectAggregator(int maxContentLength) {
this(maxContentLength, false);
}
public HttpObjectAggregator(int maxContentLength, boolean closeOnExpectationFailed) {
super(maxContentLength);
this.closeOnExpectationFailed = closeOnExpectationFailed;
}
@Override
protected HttpMessage tryStartMessage(Object msg) {
return msg instanceof HttpMessage ? (HttpMessage) msg : null;
}
@SuppressWarnings("unchecked")
@Override
protected HttpContent<C> tryContentMessage(Object msg) {
return msg instanceof HttpContent ? (HttpContent<C>) msg : null;
}
@Override
protected boolean isAggregated(Object msg) throws Exception {
return msg instanceof FullHttpMessage;
}
@Override
protected int lengthForContent(HttpContent<C> msg) {
return msg.payload().readableBytes();
}
@Override
protected int lengthForAggregation(FullHttpMessage<?> msg) {
return msg.payload().readableBytes();
}
@Override
protected boolean isLastContentMessage(HttpContent<C> msg) throws Exception {
return msg instanceof LastHttpContent;
}
@Override
protected boolean isContentLengthInvalid(HttpMessage start, int maxContentLength) {
try {
return getContentLength(start, -1L) > maxContentLength;
} catch (final NumberFormatException e) {
return false;
}
}
// 根据请求的 Expectation 头判断是否返回 100 Continue 或错误响应(如 417 或 413),并在不支持或内容过大时触发相应事件
private static FullHttpResponse continueResponse(HttpMessage start, int maxContentLength, ChannelPipeline pipeline) {
// 根据请求的 Expectation 头判断是否支持期望,若不支持触发失败事件返回 417;
if (HttpUtil.isUnsupportedExpectation(start)) {
pipeline.fireChannelInboundEvent(HttpExpectationFailedEvent.INSTANCE);
return newErrorResponse(EXPECTATION_FAILED, pipeline.channel().bufferAllocator(), true, false);
}
if (HttpUtil.is100ContinueExpected(start)) {
// 若期望 100-continue 且内容长度未超限,返回 100 Continue 响应,
if (getContentLength(start, -1L) <= maxContentLength) {
return newErrorResponse(CONTINUE, pipeline.channel().bufferAllocator(), false, false);
}
// 否则触发失败事件并返回 413 请求体过大响应。
pipeline.fireChannelInboundEvent(HttpExpectationFailedEvent.INSTANCE);
return newErrorResponse(REQUEST_ENTITY_TOO_LARGE, pipeline.channel().bufferAllocator(), true, false);
}
return null;
}
// 根据请求创建一个 100-continue 响应,并在响应生成后移除请求中的 Expect 头。
@Override
protected Object newContinueResponse(HttpMessage start, int maxContentLength, ChannelPipeline pipeline) {
FullHttpResponse response = continueResponse(start, maxContentLength, pipeline);
if (response != null) {
start.headers().remove(EXPECT);
}
return response;
}
// 判断在收到 100-continue 响应后是否关闭连接,条件是配置了关闭标志且响应表示应忽略后续内容。
@Override
protected boolean closeAfterContinueResponse(Object msg) {
return closeOnExpectationFailed && ignoreContentAfterContinueResponse(msg);
}
// 判断响应是否为客户端错误(4xx),如果是,则说明应忽略后续内容。
@Override
protected boolean ignoreContentAfterContinueResponse(Object msg) {
if (msg instanceof HttpResponse) {
final HttpResponse httpResponse = (HttpResponse) msg;
return httpResponse.status().codeClass() == HttpStatusClass.CLIENT_ERROR;
}
return false;
}
// 开始对一个非完整的 HTTP 消息进行聚合,移除分块传输编码标记,并创建一个对应请求或响应类型的空聚合消息,准备接收后续内容。
@Override
protected FullHttpMessage<?> beginAggregation(BufferAllocator allocator, HttpMessage start) throws Exception {
assert !(start instanceof FullHttpMessage);
// 移除 HTTP 消息头中的 Transfer-Encoding: chunked,以便后续使用聚合后的 Content-Length
HttpUtil.setTransferEncodingChunked(start, false);
final CompositeBuffer content = allocator.compose();
FullHttpMessage<?> ret;
// 根据消息类型创建对应的聚合消息并初始化其 payload 为一个可扩展的空 CompositeBuffer,用于后续追加内容块。
if (start instanceof HttpRequest) {
ret = new AggregatedFullHttpRequest((HttpRequest) start, content, null);
} else if (start instanceof HttpResponse) {
ret = new AggregatedFullHttpResponse((HttpResponse) start, content, null);
} else {
throw new Error();
}
return ret;
}
// 将内容块追加到聚合消息中,并在遇到最后一块时设置尾部头信息
@Override
protected void aggregate(BufferAllocator allocator, FullHttpMessage<?> aggregated, HttpContent<C> content) throws Exception {
final CompositeBuffer payload = (CompositeBuffer) aggregated.payload();
payload.extendWith(content.payload().send());
if (content instanceof LastHttpContent) {
((AggregatedFullHttpMessage<?>) aggregated).setTrailingHeaders(((LastHttpContent<?>) content).trailingHeaders());
}
}
// 完成聚合时,如果未设置 Content-Length,则自动设置为聚合内容的实际长度。
@Override
protected void finishAggregation(BufferAllocator allocator, FullHttpMessage<?> aggregated) throws Exception {
if (!HttpUtil.isContentLengthSet(aggregated)) {
aggregated.headers().set(CONTENT_LENGTH, String.valueOf(aggregated.payload().readableBytes()));
}
}
// 处理超大 HTTP 消息
@Override
protected void handleOversizedMessage(final ChannelHandlerContext ctx, Object oversized) throws Exception {
if (oversized instanceof HttpRequest) {
HttpRequest request = (HttpRequest) oversized;
// 条件1:如果是完整请求(FullHttpMessage)或者请求既不期待100-continue也不保持连接
if (oversized instanceof FullHttpMessage || !HttpUtil.is100ContinueExpected(request) && !HttpUtil.isKeepAlive(request)) {
// 发送带关闭连接指示的413错误响应
Future<Void> future = ctx.writeAndFlush(newErrorResponse(REQUEST_ENTITY_TOO_LARGE, ctx.bufferAllocator(), true, true));
future.addListener(f -> {
if (f.isFailed()) {
// 日志打印发送失败的原因
logger.debug("Failed to send a 413 Request Entity Too Large.", f.cause());
}
// 响应发送后关闭连接
ctx.close();
});
} else {
// 条件2:请求期待100-continue或者保持连接时,发送不关闭连接的413响应
ctx.writeAndFlush(newErrorResponse(REQUEST_ENTITY_TOO_LARGE, ctx.bufferAllocator(), true, false))
.addListener(future -> {
if (future.isFailed()) {
// 发送失败时日志记录并关闭连接
logger.debug("Failed to send a 413 Request Entity Too Large.", future.cause());
ctx.close();
}
});
}
} else if (oversized instanceof HttpResponse) {
// 如果是超大的响应,直接抛出异常,可能交由上层处理
throw new ResponseTooLargeException("Response entity too large: " + oversized);
} else {
// 既不是请求也不是响应,视为非法状态,抛异常
throw new IllegalStateException();
}
}
@Override
public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.channelExceptionCaught(ctx, cause);
if (cause instanceof ResponseTooLargeException) {
ctx.close();
}
}
// 该方法用于生成一个指定状态码的空响应,并根据参数决定是否关闭连接和设置内容长度
private static FullHttpResponse newErrorResponse(HttpResponseStatus status, BufferAllocator allocator, boolean emptyContent, boolean closeConnection) {
// 根据传入的状态码 status,创建一个空内容的 FullHttpResponse,
FullHttpResponse resp = new DefaultFullHttpResponse(HTTP_1_1, status, allocator.allocate(0));
// 如果 emptyContent 为 true,则设置响应头 Content-Length 为 0,
if (emptyContent) {
resp.headers().set(CONTENT_LENGTH, HttpHeaderValues.ZERO);
}
// 如果 closeConnection 为 true,则设置响应头 Connection: close,表示连接关闭。
if (closeConnection) {
resp.headers().set(CONNECTION, HttpHeaderValues.CLOSE);
}
return resp;
}
}
AggregatedFullHttpMessage
private abstract static class AggregatedFullHttpMessage<R extends FullHttpMessage<R>> implements FullHttpMessage<R> {
protected final HttpMessage message;
private final Buffer payload;
private HttpHeaders trailingHeaders;
AggregatedFullHttpMessage(HttpMessage message, Buffer payload, HttpHeaders trailingHeaders) {
this.message = message;
this.payload = payload;
this.trailingHeaders = trailingHeaders;
}
@Override
public void close() {
payload.close();
}
@Override
public boolean isAccessible() {
return payload.isAccessible();
}
@Override
public Buffer payload() {
return payload;
}
@Override
public HttpHeaders trailingHeaders() {
HttpHeaders trailingHeaders = this.trailingHeaders;
return requireNonNullElse(trailingHeaders, HttpHeaders.emptyHeaders());
}
void setTrailingHeaders(HttpHeaders trailingHeaders) {
this.trailingHeaders = trailingHeaders;
}
@Override
public HttpVersion getProtocolVersion() {
return message.protocolVersion();
}
@Override
public HttpVersion protocolVersion() {
return message.protocolVersion();
}
@Override
public FullHttpMessage<R> setProtocolVersion(HttpVersion version) {
message.setProtocolVersion(version);
return this;
}
@Override
public HttpHeaders headers() {
return message.headers();
}
@Override
public DecoderResult decoderResult() {
return message.decoderResult();
}
@Override
public void setDecoderResult(DecoderResult result) {
message.setDecoderResult(result);
}
}
AggregatedFullHttpRequest
private static final class AggregatedFullHttpRequest extends AggregatedFullHttpMessage<FullHttpRequest> implements FullHttpRequest {
AggregatedFullHttpRequest(HttpRequest request, Buffer content, HttpHeaders trailingHeaders) {
super(request, content, trailingHeaders);
}
@Override
public Send<FullHttpRequest> send() {
return payload().send().map(FullHttpRequest.class,
p -> new AggregatedFullHttpRequest(this, p, trailingHeaders()));
}
@Override
public AggregatedFullHttpRequest copy() {
return new AggregatedFullHttpRequest(this, payload().copy(), trailingHeaders().copy());
}
@Override
public FullHttpRequest touch(Object hint) {
payload().touch(hint);
return this;
}
@Override
public FullHttpRequest setMethod(HttpMethod method) {
((HttpRequest) message).setMethod(method);
return this;
}
@Override
public FullHttpRequest setUri(String uri) {
((HttpRequest) message).setUri(uri);
return this;
}
@Override
public HttpMethod method() {
return ((HttpRequest) message).method();
}
@Override
public String uri() {
return ((HttpRequest) message).uri();
}
@Override
public FullHttpRequest setProtocolVersion(HttpVersion version) {
super.setProtocolVersion(version);
return this;
}
@Override
public String toString() {
return HttpMessageUtil.appendFullRequest(new StringBuilder(256), this).toString();
}
}
AggregatedFullHttpResponse
private static final class AggregatedFullHttpResponse extends AggregatedFullHttpMessage<FullHttpResponse> implements FullHttpResponse {
AggregatedFullHttpResponse(HttpResponse message, Buffer content, HttpHeaders trailingHeaders) {
super(message, content, trailingHeaders);
}
@Override
public Send<FullHttpResponse> send() {
return payload().send().map(FullHttpResponse.class,
p -> new AggregatedFullHttpResponse(this, p, trailingHeaders()));
}
@Override
public AggregatedFullHttpResponse copy() {
return new AggregatedFullHttpResponse(this, payload().copy(), trailingHeaders().copy());
}
@Override
public FullHttpResponse touch(Object hint) {
payload().touch(hint);
return this;
}
@Override
public FullHttpResponse setStatus(HttpResponseStatus status) {
((HttpResponse) message).setStatus(status);
return this;
}
@Override
public HttpResponseStatus status() {
return ((HttpResponse) message).status();
}
@Override
public FullHttpResponse setProtocolVersion(HttpVersion version) {
super.setProtocolVersion(version);
return this;
}
@Override
public String toString() {
return HttpMessageUtil.appendFullResponse(new StringBuilder(256), this).toString();
}
}