在阅读这篇文章前,推荐先阅读
- [netty5: ByteToMessageCodec & MessageToByteEncoder & ByteToMessageDecoder]-源码分析
- [netty5: HttpObject]-源码解析
100-continue
100-continue 是 HTTP/1.1 协议中的一种机制,用于客户端在发送大体积请求体(如文件上传)前,先向服务器发送一个带有 Expect: 100-continue 头的请求,询问服务器是否准备好接收请求体;服务器如果准备好了,会返回 100 Continue 响应,客户端才开始发送实际数据,从而避免不必要的大数据传输。
MessageAggregator
MessageAggregator<I, S, C, A> 是一个高度可复用的通用聚合器框架,适用于各种流式协议的“分段消息聚合”场景:
功能 | 说明 |
---|---|
启动聚合 | tryStartMessage() 检测到起始消息,初始化聚合 |
聚合中 | tryContentMessage() 检测到内容消息,append 内容 |
完成聚合 | isLastContentMessage() 判断是否是最后一块,调用 finishAggregation |
超长控制 | maxContentLength + lengthForContent 控制体积 |
特殊控制 | 支持 100-continue、异常处理、复用 context listener |
public abstract class MessageAggregator<I, S, C extends AutoCloseable, A extends AutoCloseable> extends MessageToMessageDecoder<I> {// 当前正在聚合的完整消息(例如 FullHttpRequest 或 FullHttpResponse)private A currentMessage;// 聚合内容允许的最大字节数,超过则触发 handleOversizedMessageprivate final int maxContentLength;// 标识当前是否正在处理超长消息,避免重复处理。private boolean handlingOversizedMessage;private ChannelHandlerContext ctx;// 用于监听 100-Continue 响应写入完成后的回调private FutureContextListener<ChannelHandlerContext, Void> continueResponseWriteListener;// 标识是否正在聚合过程中private boolean aggregating;// 是否在通道关闭时处理未完成的聚合private boolean handleIncompleteAggregateDuringClose = true;protected MessageAggregator(int maxContentLength) {this.maxContentLength = maxContentLength;}@Overridepublic boolean acceptInboundMessage(Object msg) throws Exception {if (!super.acceptInboundMessage(msg)) {return false;}if (isAggregated(msg)) {return false;}if (tryStartMessage(msg) != null) {return true;}return aggregating && tryContentMessage(msg) != null;}@Overrideprotected void decode(final ChannelHandlerContext ctx, I msg) throws Exception {// 1. 判断是否为新消息起始(startMsg)// 判断当前收到的 msg 是否为 HttpMessage,如果是,则开始处理聚合。final S startMsg = tryStartMessage(msg);if (startMsg != null) {aggregating = true;handlingOversizedMessage = false;// 如果已存在未完成的 currentMessage,说明消息异常,抛出 MessageAggregationExceptionif (currentMessage != null) {currentMessage.close();currentMessage = null;throw new MessageAggregationException();}// 2. 处理 100-continue 相关响应(continueResponse)// newContinueResponse 的核心逻辑:// - 如果请求头中包含 Expect: 100-continue,并且请求体大小没有超过 maxContentLength,则返回一个 100 Continue 响应;// - 如果请求体过大(Content-Length > maxContentLength),则返回一个 413 Request Entity Too Large 错误响应;// - 如果不符合任何条件,返回 null,表示不需要继续响应。Object continueResponse = newContinueResponse(startMsg, maxContentLength, ctx.pipeline());if (continueResponse != null) {// 构造监听器FutureContextListener<ChannelHandlerContext, Void> listener = continueResponseWriteListener;if (listener == null) {continueResponseWriteListener = listener = (context, future) -> {if (future.isFailed()) {context.fireChannelExceptionCaught(future.cause());}};}// 判断在收到 100-continue 响应后是否关闭连接,条件是配置了关闭标志且响应表示应忽略后续内容。boolean closeAfterWrite = closeAfterContinueResponse(continueResponse);// // 判断响应是否为客户端错误(4xx),如果是,则说明应忽略后续内容。handlingOversizedMessage = ignoreContentAfterContinueResponse(continueResponse);// 写出响应并监听结果Future<Void> future = ctx.writeAndFlush(continueResponse).addListener(ctx, listener);if (closeAfterWrite) {handleIncompleteAggregateDuringClose = false;future.addListener(ctx, ChannelFutureListeners.CLOSE);return;}// 判断是否忽略后续内容if (handlingOversizedMessage) {return;}} else if (isContentLengthInvalid(startMsg, maxContentLength)) {// 3. 检查请求长度是否合法invokeHandleOversizedMessage(ctx, startMsg);return;}// 4. 处理起始消息中已有的解码失败情况if (startMsg instanceof DecoderResultProvider &&!((DecoderResultProvider) startMsg).decoderResult().isSuccess()) {final A aggregated = beginAggregation(ctx.bufferAllocator(), startMsg);finishAggregation(ctx.bufferAllocator(), aggregated);ctx.fireChannelRead(aggregated);return;}// 5. 初始化新消息聚合对象// 创建一个聚合消息实例(包含起始行和一个空的内容缓冲区),等待后续内容片段加入。currentMessage = beginAggregation(ctx.bufferAllocator(), startMsg);return;}// 6. 处理内容消息(contentMsg)// 先判断 msg 是否是消息体片段,如果不是则抛异常。final C contentMsg = tryContentMessage(msg);if (contentMsg != null) {// 如果还没有初始化的聚合消息(即没有起始消息),忽略该内容。if (currentMessage == null) {return;}// 检查聚合后长度是否超限,超限则调用超长消息处理。if (lengthForAggregation(currentMessage) > maxContentLength - lengthForContent(contentMsg)) {invokeHandleOversizedMessage(ctx, currentMessage);return;}// 调用 aggregate 将当前内容片段追加到聚合消息。aggregate(ctx.bufferAllocator(), currentMessage, contentMsg);final boolean last;// 检查是否为消息最后一片(last)if (contentMsg instanceof DecoderResultProvider) {DecoderResult decoderResult = ((DecoderResultProvider) contentMsg).decoderResult();if (!decoderResult.isSuccess()) {if (currentMessage instanceof DecoderResultProvider) {((DecoderResultProvider) currentMessage).setDecoderResult(DecoderResult.failure(decoderResult.cause()));}last = true;} else {last = isLastContentMessage(contentMsg);}} else {last = isLastContentMessage(contentMsg);}// 如果是,完成聚合,清理状态,向下游传递完整消息。if (last) {finishAggregation0(ctx.bufferAllocator(), currentMessage);// All doneA message = currentMessage;currentMessage = null;ctx.fireChannelRead(message);}} else {throw new MessageAggregationException();}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {if (currentMessage != null && !ctx.channel().getOption(ChannelOption.AUTO_READ)) {ctx.read();}ctx.fireChannelReadComplete();}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {if (aggregating && handleIncompleteAggregateDuringClose) {ctx.fireChannelExceptionCaught(new PrematureChannelClosureException("Channel closed while still aggregating message"));}try {super.channelInactive(ctx);} finally {releaseCurrentMessage();}}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {this.ctx = ctx;}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {try {super.handlerRemoved(ctx);} finally {releaseCurrentMessage();}}protected final void releaseCurrentMessage() throws Exception {if (currentMessage != null) {currentMessage.close();currentMessage = null;}handlingOversizedMessage = false;aggregating = false;}// 省略抽象方法,具体看 HttpObjectAggregator
}
HttpObjectAggregator
HttpObjectAggregator
是构建高层 HTTP 服务的基础设施组件,它将分块的 HTTP 请求或响应组装为完整对象,从而简化上层应用逻辑。其设计清晰、可扩展性强,并充分考虑了 Expect: 100-continue 与 Content-Length 异常等 HTTP 协议边界情况,是非常值得借鉴的聚合处理器实现。
public class HttpObjectAggregator<C extends HttpContent<C>>extends MessageAggregator<HttpObject, HttpMessage, HttpContent<C>, FullHttpMessage<?>> {private static final Logger logger = LoggerFactory.getLogger(HttpObjectAggregator.class);// 当检测到客户端发送了 100-continue 期望但请求内容过大时,是否关闭连接;// 为 true 则直接关闭连接,防止浪费资源,// 为 false 则保持连接打开并继续读取和丢弃数据直到下一请求。private final boolean closeOnExpectationFailed;public HttpObjectAggregator(int maxContentLength) {this(maxContentLength, false);}public HttpObjectAggregator(int maxContentLength, boolean closeOnExpectationFailed) {super(maxContentLength);this.closeOnExpectationFailed = closeOnExpectationFailed;}@Overrideprotected HttpMessage tryStartMessage(Object msg) {return msg instanceof HttpMessage ? (HttpMessage) msg : null;}@SuppressWarnings("unchecked")@Overrideprotected HttpContent<C> tryContentMessage(Object msg) {return msg instanceof HttpContent ? (HttpContent<C>) msg : null;}@Overrideprotected boolean isAggregated(Object msg) throws Exception {return msg instanceof FullHttpMessage;}@Overrideprotected int lengthForContent(HttpContent<C> msg) {return msg.payload().readableBytes();}@Overrideprotected int lengthForAggregation(FullHttpMessage<?> msg) {return msg.payload().readableBytes();}@Overrideprotected boolean isLastContentMessage(HttpContent<C> msg) throws Exception {return msg instanceof LastHttpContent;}@Overrideprotected boolean isContentLengthInvalid(HttpMessage start, int maxContentLength) {try {return getContentLength(start, -1L) > maxContentLength;} catch (final NumberFormatException e) {return false;}}// 根据请求的 Expectation 头判断是否返回 100 Continue 或错误响应(如 417 或 413),并在不支持或内容过大时触发相应事件private static FullHttpResponse continueResponse(HttpMessage start, int maxContentLength, ChannelPipeline pipeline) {// 根据请求的 Expectation 头判断是否支持期望,若不支持触发失败事件返回 417;if (HttpUtil.isUnsupportedExpectation(start)) {pipeline.fireChannelInboundEvent(HttpExpectationFailedEvent.INSTANCE);return newErrorResponse(EXPECTATION_FAILED, pipeline.channel().bufferAllocator(), true, false);}if (HttpUtil.is100ContinueExpected(start)) {// 若期望 100-continue 且内容长度未超限,返回 100 Continue 响应,if (getContentLength(start, -1L) <= maxContentLength) {return newErrorResponse(CONTINUE, pipeline.channel().bufferAllocator(), false, false);}// 否则触发失败事件并返回 413 请求体过大响应。pipeline.fireChannelInboundEvent(HttpExpectationFailedEvent.INSTANCE);return newErrorResponse(REQUEST_ENTITY_TOO_LARGE, pipeline.channel().bufferAllocator(), true, false);}return null;}// 根据请求创建一个 100-continue 响应,并在响应生成后移除请求中的 Expect 头。@Overrideprotected Object newContinueResponse(HttpMessage start, int maxContentLength, ChannelPipeline pipeline) {FullHttpResponse response = continueResponse(start, maxContentLength, pipeline);if (response != null) {start.headers().remove(EXPECT);}return response;}// 判断在收到 100-continue 响应后是否关闭连接,条件是配置了关闭标志且响应表示应忽略后续内容。@Overrideprotected boolean closeAfterContinueResponse(Object msg) {return closeOnExpectationFailed && ignoreContentAfterContinueResponse(msg);}// 判断响应是否为客户端错误(4xx),如果是,则说明应忽略后续内容。@Overrideprotected boolean ignoreContentAfterContinueResponse(Object msg) {if (msg instanceof HttpResponse) {final HttpResponse httpResponse = (HttpResponse) msg;return httpResponse.status().codeClass() == HttpStatusClass.CLIENT_ERROR;}return false;}// 开始对一个非完整的 HTTP 消息进行聚合,移除分块传输编码标记,并创建一个对应请求或响应类型的空聚合消息,准备接收后续内容。@Overrideprotected FullHttpMessage<?> beginAggregation(BufferAllocator allocator, HttpMessage start) throws Exception {assert !(start instanceof FullHttpMessage);// 移除 HTTP 消息头中的 Transfer-Encoding: chunked,以便后续使用聚合后的 Content-LengthHttpUtil.setTransferEncodingChunked(start, false);final CompositeBuffer content = allocator.compose();FullHttpMessage<?> ret;// 根据消息类型创建对应的聚合消息并初始化其 payload 为一个可扩展的空 CompositeBuffer,用于后续追加内容块。if (start instanceof HttpRequest) {ret = new AggregatedFullHttpRequest((HttpRequest) start, content, null);} else if (start instanceof HttpResponse) {ret = new AggregatedFullHttpResponse((HttpResponse) start, content, null);} else {throw new Error();}return ret;}// 将内容块追加到聚合消息中,并在遇到最后一块时设置尾部头信息@Overrideprotected void aggregate(BufferAllocator allocator, FullHttpMessage<?> aggregated, HttpContent<C> content) throws Exception {final CompositeBuffer payload = (CompositeBuffer) aggregated.payload();payload.extendWith(content.payload().send());if (content instanceof LastHttpContent) {((AggregatedFullHttpMessage<?>) aggregated).setTrailingHeaders(((LastHttpContent<?>) content).trailingHeaders());}}// 完成聚合时,如果未设置 Content-Length,则自动设置为聚合内容的实际长度。@Overrideprotected void finishAggregation(BufferAllocator allocator, FullHttpMessage<?> aggregated) throws Exception {if (!HttpUtil.isContentLengthSet(aggregated)) {aggregated.headers().set(CONTENT_LENGTH, String.valueOf(aggregated.payload().readableBytes()));}}// 处理超大 HTTP 消息@Overrideprotected void handleOversizedMessage(final ChannelHandlerContext ctx, Object oversized) throws Exception {if (oversized instanceof HttpRequest) {HttpRequest request = (HttpRequest) oversized;// 条件1:如果是完整请求(FullHttpMessage)或者请求既不期待100-continue也不保持连接if (oversized instanceof FullHttpMessage || !HttpUtil.is100ContinueExpected(request) && !HttpUtil.isKeepAlive(request)) {// 发送带关闭连接指示的413错误响应Future<Void> future = ctx.writeAndFlush(newErrorResponse(REQUEST_ENTITY_TOO_LARGE, ctx.bufferAllocator(), true, true));future.addListener(f -> {if (f.isFailed()) {// 日志打印发送失败的原因logger.debug("Failed to send a 413 Request Entity Too Large.", f.cause());}// 响应发送后关闭连接ctx.close();});} else {// 条件2:请求期待100-continue或者保持连接时,发送不关闭连接的413响应ctx.writeAndFlush(newErrorResponse(REQUEST_ENTITY_TOO_LARGE, ctx.bufferAllocator(), true, false)).addListener(future -> {if (future.isFailed()) {// 发送失败时日志记录并关闭连接logger.debug("Failed to send a 413 Request Entity Too Large.", future.cause());ctx.close();}});}} else if (oversized instanceof HttpResponse) {// 如果是超大的响应,直接抛出异常,可能交由上层处理throw new ResponseTooLargeException("Response entity too large: " + oversized);} else {// 既不是请求也不是响应,视为非法状态,抛异常throw new IllegalStateException();}}@Overridepublic void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {super.channelExceptionCaught(ctx, cause);if (cause instanceof ResponseTooLargeException) {ctx.close();}}// 该方法用于生成一个指定状态码的空响应,并根据参数决定是否关闭连接和设置内容长度private static FullHttpResponse newErrorResponse(HttpResponseStatus status, BufferAllocator allocator, boolean emptyContent, boolean closeConnection) {// 根据传入的状态码 status,创建一个空内容的 FullHttpResponse,FullHttpResponse resp = new DefaultFullHttpResponse(HTTP_1_1, status, allocator.allocate(0));// 如果 emptyContent 为 true,则设置响应头 Content-Length 为 0,if (emptyContent) {resp.headers().set(CONTENT_LENGTH, HttpHeaderValues.ZERO);}// 如果 closeConnection 为 true,则设置响应头 Connection: close,表示连接关闭。if (closeConnection) {resp.headers().set(CONNECTION, HttpHeaderValues.CLOSE);}return resp;}
}
AggregatedFullHttpMessage
private abstract static class AggregatedFullHttpMessage<R extends FullHttpMessage<R>> implements FullHttpMessage<R> {protected final HttpMessage message;private final Buffer payload;private HttpHeaders trailingHeaders;AggregatedFullHttpMessage(HttpMessage message, Buffer payload, HttpHeaders trailingHeaders) {this.message = message;this.payload = payload;this.trailingHeaders = trailingHeaders;}@Overridepublic void close() {payload.close();}@Overridepublic boolean isAccessible() {return payload.isAccessible();}@Overridepublic Buffer payload() {return payload;}@Overridepublic HttpHeaders trailingHeaders() {HttpHeaders trailingHeaders = this.trailingHeaders;return requireNonNullElse(trailingHeaders, HttpHeaders.emptyHeaders());}void setTrailingHeaders(HttpHeaders trailingHeaders) {this.trailingHeaders = trailingHeaders;}@Overridepublic HttpVersion getProtocolVersion() {return message.protocolVersion();}@Overridepublic HttpVersion protocolVersion() {return message.protocolVersion();}@Overridepublic FullHttpMessage<R> setProtocolVersion(HttpVersion version) {message.setProtocolVersion(version);return this;}@Overridepublic HttpHeaders headers() {return message.headers();}@Overridepublic DecoderResult decoderResult() {return message.decoderResult();}@Overridepublic void setDecoderResult(DecoderResult result) {message.setDecoderResult(result);}
}
AggregatedFullHttpRequest
private static final class AggregatedFullHttpRequest extends AggregatedFullHttpMessage<FullHttpRequest> implements FullHttpRequest {AggregatedFullHttpRequest(HttpRequest request, Buffer content, HttpHeaders trailingHeaders) {super(request, content, trailingHeaders);}@Overridepublic Send<FullHttpRequest> send() {return payload().send().map(FullHttpRequest.class,p -> new AggregatedFullHttpRequest(this, p, trailingHeaders()));}@Overridepublic AggregatedFullHttpRequest copy() {return new AggregatedFullHttpRequest(this, payload().copy(), trailingHeaders().copy());}@Overridepublic FullHttpRequest touch(Object hint) {payload().touch(hint);return this;}@Overridepublic FullHttpRequest setMethod(HttpMethod method) {((HttpRequest) message).setMethod(method);return this;}@Overridepublic FullHttpRequest setUri(String uri) {((HttpRequest) message).setUri(uri);return this;}@Overridepublic HttpMethod method() {return ((HttpRequest) message).method();}@Overridepublic String uri() {return ((HttpRequest) message).uri();}@Overridepublic FullHttpRequest setProtocolVersion(HttpVersion version) {super.setProtocolVersion(version);return this;}@Overridepublic String toString() {return HttpMessageUtil.appendFullRequest(new StringBuilder(256), this).toString();}
}
AggregatedFullHttpResponse
private static final class AggregatedFullHttpResponse extends AggregatedFullHttpMessage<FullHttpResponse> implements FullHttpResponse {AggregatedFullHttpResponse(HttpResponse message, Buffer content, HttpHeaders trailingHeaders) {super(message, content, trailingHeaders);}@Overridepublic Send<FullHttpResponse> send() {return payload().send().map(FullHttpResponse.class,p -> new AggregatedFullHttpResponse(this, p, trailingHeaders()));}@Overridepublic AggregatedFullHttpResponse copy() {return new AggregatedFullHttpResponse(this, payload().copy(), trailingHeaders().copy());}@Overridepublic FullHttpResponse touch(Object hint) {payload().touch(hint);return this;}@Overridepublic FullHttpResponse setStatus(HttpResponseStatus status) {((HttpResponse) message).setStatus(status);return this;}@Overridepublic HttpResponseStatus status() {return ((HttpResponse) message).status();}@Overridepublic FullHttpResponse setProtocolVersion(HttpVersion version) {super.setProtocolVersion(version);return this;}@Overridepublic String toString() {return HttpMessageUtil.appendFullResponse(new StringBuilder(256), this).toString();}
}