在阅读该篇文章之前,推荐先阅读以下内容:
- [netty5: ChannelHandler & ChannelHandlerAdapter]-源码解析
- [netty5: HttpObjectEncoder & HttpObjectDecoder]-源码解析
HttpServerCodec
HttpServerCodec
是一个 Netty 编解码器,结合 HttpRequestDecoder
和 HttpResponseEncoder
,用于处理 HTTP 请求和响应,并支持协议升级。
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder> implements HttpServerUpgradeHandler.SourceCodec {private final Queue<HttpMethod> queue = new ArrayDeque<>();public HttpServerCodec() {// 4096 8192this(DEFAULT_MAX_INITIAL_LINE_LENGTH, DEFAULT_MAX_HEADER_SIZE);}public HttpServerCodec(int maxInitialLineLength, int maxHeaderSize) {init(new HttpServerRequestDecoder(maxInitialLineLength, maxHeaderSize),new HttpServerResponseEncoder());}@Overridepublic void upgradeFrom(ChannelHandlerContext ctx) {ctx.pipeline().remove(this);}
}
HttpServerRequestDecoder
HttpServerRequestDecoder 主要用于解码 HTTP 请求,并在解码时通过 queue 队列保存请求方法,以便在后续处理响应时进行配对。
private final class HttpServerRequestDecoder extends HttpRequestDecoder {private ChannelHandlerContext context;HttpServerRequestDecoder(int maxInitialLineLength, int maxHeaderSize) {super(maxInitialLineLength, maxHeaderSize);}HttpServerRequestDecoder(HttpDecoderConfig config) {super(config);}@Overrideprotected void decode(final ChannelHandlerContext ctx, Buffer buffer) throws Exception {super.decode(context, buffer);}@Overrideprotected void handlerAdded0(final ChannelHandlerContext ctx) {context = new DelegatingChannelHandlerContext(ctx) {@Overridepublic ChannelHandlerContext fireChannelRead(Object msg) {if (msg instanceof HttpRequest) {// 存放请求方法queue.add(((HttpRequest) msg).method());}super.fireChannelRead(msg);return this;}};}
}
HttpServerResponseEncoder
HttpServerResponseEncoder
负责处理 HTTP 响应编码,移除特定头部并根据请求方法决定响应内容是否为空。
private final class HttpServerResponseEncoder extends HttpResponseEncoder {private HttpMethod method;@Overrideprotected void sanitizeHeadersBeforeEncode(HttpResponse msg, boolean isAlwaysEmpty) {if (!isAlwaysEmpty && HttpMethod.CONNECT.equals(method) && msg.status().codeClass() == HttpStatusClass.SUCCESS) {msg.headers().remove(HttpHeaderNames.TRANSFER_ENCODING);return;}super.sanitizeHeadersBeforeEncode(msg, isAlwaysEmpty);}// 判断 HTTP 响应是否总是没有内容@Overrideprotected boolean isContentAlwaysEmpty(@SuppressWarnings("unused") HttpResponse msg) {// 响应完成,先查看当前响应的请求方法,如果是HEAD, 响应永远无内容,直接返回 truemethod = queue.poll();return HttpMethod.HEAD.equals(method) || super.isContentAlwaysEmpty(msg);}
}
HttpClientCodec
HttpClientCodec
是一个 Netty 编解码器,结合 HttpResponseDecoder
和 HttpRequestEncoder
,支持请求与响应的关联、连接升级处理,并提供配置选项来控制解析行为和响应缺失时的错误处理。
public final class HttpClientCodec extends CombinedChannelDuplexHandler<HttpResponseDecoder, HttpRequestEncoder> implements HttpClientUpgradeHandler.SourceCodec {public static final boolean DEFAULT_FAIL_ON_MISSING_RESPONSE = false;public static final boolean DEFAULT_PARSE_HTTP_AFTER_CONNECT_REQUEST = false;// 用于控制在关闭通道时,请求数量 != 响应数量时,是否抛出异常。// 触发在 channelInactive, 如果为true, requestResponseCounter < 0 则抛出异常,否则忽略private final boolean failOnMissingResponse;// 用于跟踪 请求与响应对的数量 的计数器,具体来说,它用于 跟踪缺失的响应 数量。private final AtomicLong requestResponseCounter = new AtomicLong();// 用于指示是否在处理 CONNECT 请求后继续解析 HTTP 内容。// 如果为 true,则会在 CONNECT 请求后继续解析 HTTP 数据;// 如果为 false,则直接跳过解析private final boolean parseHttpAfterConnectRequest;// 用于指示解码是否停止,如果为true,则解码过程会被跳过,数据将直接传递下去。private boolean done;private final Queue<HttpMethod> queue = new ArrayDeque<>();public HttpClientCodec() {this(new HttpDecoderConfig());}public HttpClientCodec(HttpDecoderConfig config) {this(config, DEFAULT_FAIL_ON_MISSING_RESPONSE, DEFAULT_PARSE_HTTP_AFTER_CONNECT_REQUEST);}public HttpClientCodec(HttpDecoderConfig config, boolean failOnMissingResponse, boolean parseHttpAfterConnectRequest) {init(new Decoder(config), new Encoder());this.parseHttpAfterConnectRequest = parseHttpAfterConnectRequest;this.failOnMissingResponse = failOnMissingResponse;}@Overridepublic void prepareUpgradeFrom(ChannelHandlerContext ctx) {((Encoder) outboundHandler()).upgraded = true;}@Overridepublic void upgradeFrom(ChannelHandlerContext ctx) {final ChannelPipeline p = ctx.pipeline();p.remove(this);}public void setSingleDecode(boolean singleDecode) {inboundHandler().setSingleDecode(singleDecode);}public boolean isSingleDecode() {return inboundHandler().isSingleDecode();}
}
Encoder
Encoder 是 HttpRequestEncoder 的子类,用于编码 HTTP 请求。当协议已升级时,直接将消息添加到输出队列;否则,将 HTTP 请求的方法加入队列,并继续进行编码。如果启用了 failOnMissingResponse 且响应尚未完成,则会在处理完 LastHttpContent 后增加请求响应计数。
private final class Encoder extends HttpRequestEncoder {boolean upgraded;@Overrideprotected void encodeAndClose(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {// 当连接已经升级(比如从 HTTP 升级到 WebSocket 或其他协议),此时不需要再进行 HTTP 编码,直接将消息传递下去if (upgraded) {out.add(msg);return;}if (msg instanceof HttpRequest) {queue.offer(((HttpRequest) msg).method());}super.encodeAndClose(ctx, msg, out);// failOnMissingResponse 在缺少响应是报错,并且done未完成,msg是最后一段内容,此时统计// 否则跳过执行if (failOnMissingResponse && !done) {if (msg instanceof LastHttpContent) {// 当响应到达时,requestResponseCounter 会减少计数,表示某个请求已经得到了响应requestResponseCounter.incrementAndGet();}}}
}
Decoder
Decoder 是 HttpResponseDecoder 的子类,用于解码 HTTP 响应,并在协议升级时管理请求响应的匹配;在响应解码时处理特定状态码(如 HEAD 请求和 CONNECT 请求的响应),并通过队列同步请求和响应。此外,Decoder 支持在连接关闭时检查未匹配的响应并抛出异常。
private final class Decoder extends HttpResponseDecoder {private ChannelHandlerContext context;Decoder(HttpDecoderConfig config) {super(config);}@Overrideprotected void handlerAdded0(ChannelHandlerContext ctx) {// ctx = ByteToMessageDecoder.ByteToMessageDecoderContextif (failOnMissingResponse) {context = new DelegatingChannelHandlerContext(ctx) {@Overridepublic ChannelHandlerContext fireChannelRead(Object msg) {decrement(msg);super.fireChannelRead(msg);return this;}};} else {context = ctx;}}@Overrideprotected void decode(ChannelHandlerContext ctx, Buffer buffer) throws Exception {if (done) {int readable = actualReadableBytes();if (readable == 0) {return;}ctx.fireChannelRead(buffer.readSplit(readable));} else {super.decode(context, buffer);}}private void decrement(Object msg) {if (msg == null) {return;}// 每当收到一个请求并且没有收到相应的响应时,requestResponseCounter 会增加一个计数值,表示有一个请求缺少响应if (msg instanceof LastHttpContent) {requestResponseCounter.decrementAndGet();}}// 判断消息是否包含内容,特别针对一些特殊的 HTTP 方法和状态码(如 HEAD 请求、CONNECT 请求的响应)@Overrideprotected boolean isContentAlwaysEmpty(HttpMessage msg) {final HttpResponseStatus status = ((HttpResponse) msg).status();final HttpStatusClass statusClass = status.codeClass();final int statusCode = status.code();if (statusClass == HttpStatusClass.INFORMATIONAL) {return super.isContentAlwaysEmpty(msg);}HttpMethod method = queue.poll(); if (method != null) {char firstChar = method.name().charAt(0);switch (firstChar) {case 'H':if (HttpMethod.HEAD.equals(method)) {return true;}break;case 'C':// 协议升级成功if (statusCode == 200) {if (HttpMethod.CONNECT.equals(method)) {// 一旦 CONNECT 请求成功建立连接后,不再进行 HTTP 响应解析,直接跳过// 通常适用于已经成功建立隧道后,不再需要处理 HTTP 相关内容的情况(例如,HTTPS 或 WebSocket)if (!parseHttpAfterConnectRequest) {done = true;queue.clear();}return true;}}break;default:break;}}return super.isContentAlwaysEmpty(msg);}// 如果在关闭连接时发现有未匹配的请求(即计数器值大于零),则可以触发异常,表明有请求缺少响应。@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);if (failOnMissingResponse) {long missingResponses = requestResponseCounter.get();if (missingResponses > 0) {ctx.fireChannelExceptionCaught(new PrematureChannelClosureException("channel gone inactive with " + missingResponses +" missing response(s)"));}}}
}