ChunkedInput
ChunkedInput<B>
是 Netty 中用于按块读取不定长数据流的接口,常配合 ChunkedWriteHandler
实现流式写入,支持如文件、流、HTTP 和 WebSocket 等多种数据源。
实现类 | 简要说明 |
---|---|
ChunkedFile | 用于将常规文件按块传输(使用传统阻塞 IO)。 |
ChunkedNioFile | 用于将 FileChannel 形式的文件通过 NIO 按块传输。 |
ChunkedNioStream | 将 ReadableByteChannel 数据源作为块状输入,适用于 NIO 输入流。 |
ChunkedStream | 将 InputStream (阻塞流)按块读取并传输。 |
Http2DataChunkedInput | 专为 HTTP/2 数据帧的按块输入设计,用于发送 DATA 帧。 |
HttpChunkedInput | 将 HttpContent 对象(如文件块)封装为支持 trailer 的块状 HTTP 输出。 |
WebSocketChunkedInput | 用于 WebSocket 数据帧分块传输,支持大帧拆分成多个 WebSocket 帧。 |
public interface ChunkedInput<B> extends AutoCloseable {// 是否已经读取到输入流末尾boolean isEndOfInput() throws Exception;// 从数据流中读取下一段数据块(chunk)B readChunk(BufferAllocator allocator) throws Exception;// 返回整个输入源的长度(如果已知)long length();// 返回目前已经“传输”的字节数long progress();
}
ChunkedWriteHandler
ChunkedWriteHandler 是 Netty 中用于分块写入大数据流(如文件、视频流等)的处理器,核心职责是将大数据拆成小块逐步异步写入,避免一次性占用大量内存,提高传输效率和系统稳定性。主要特点和功能包括:
- 分块写入:支持 ChunkedInput 类型的数据流,按块读取并写入,适合无法一次性全部加载到内存的大数据。
- 异步处理:内部维护一个待写队列(PendingWrite),通过事件驱动机制逐块写出数据,保证非阻塞的高效传输。
- 资源管理:在写入完成或异常关闭时,会自动关闭数据流,释放资源,防止内存泄漏。
- 错误处理:遇到写入失败或通道关闭时,能正确通知每个待写任务失败,并清理队列。
- 流控制:自动管理写请求,避免写入过快导致拥塞,通过事件循环调度写操作。
public class ChunkedWriteHandler implements ChannelHandler {private static final Logger logger = LoggerFactory.getLogger(ChunkedWriteHandler.class);private Queue<PendingWrite> queue;private volatile ChannelHandlerContext ctx;public ChunkedWriteHandler() {}private void allocateQueue() {if (queue == null) {queue = new ArrayDeque<>();}}private boolean queueIsEmpty() {return queue == null || queue.isEmpty();}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {this.ctx = ctx;}public void resumeTransfer() {final ChannelHandlerContext ctx = this.ctx;if (ctx == null) {return;}if (ctx.executor().inEventLoop()) {resumeTransfer0(ctx);} else {ctx.executor().execute(() -> resumeTransfer0(ctx));}}private void resumeTransfer0(ChannelHandlerContext ctx) {try {doFlush(ctx);} catch (Exception e) {logger.warn("Unexpected exception while sending chunks.", e);}}// 如果当前有待写队列(queue)不为空,或者写入的消息是 ChunkedInput(分块数据流),则将写操作封装为 PendingWrite 并加入队列,返回对应的 Future(异步写结果)。// 否则直接调用下游的 ctx.write(msg)。@Overridepublic Future<Void> write(ChannelHandlerContext ctx, Object msg) {if (!queueIsEmpty() || msg instanceof ChunkedInput) {allocateQueue();Promise<Void> promise = ctx.newPromise();queue.add(new PendingWrite(msg, promise));return promise.asFuture();} else {return ctx.write(msg);}}@Overridepublic void flush(ChannelHandlerContext ctx) {doFlush(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {doFlush(ctx);ctx.fireChannelInactive();}// 实现基于通道写缓冲区状态的流控,防止写过快导致内存溢出@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isWritable()) {doFlush(ctx);}ctx.fireChannelWritabilityChanged();}// 从 queue 中依次取出 PendingWrite,若是 ChunkedInput 则判断是否写完,关闭资源,并根据情况调用 fail() 或 success()。private void discard(Throwable cause) {if (queueIsEmpty()) {return;}for (;;) {PendingWrite currentWrite = queue.poll();if (currentWrite == null) {break;}Object message = currentWrite.msg;if (message instanceof ChunkedInput) {ChunkedInput<?> in = (ChunkedInput<?>) message;boolean endOfInput;try {endOfInput = in.isEndOfInput();closeInput(in);} catch (Exception e) {closeInput(in);currentWrite.fail(e);logger.warn("ChunkedInput failed", e);continue;}if (!endOfInput) {if (cause == null) {cause = new ClosedChannelException();}currentWrite.fail(cause);} else {currentWrite.success();}} else {if (cause == null) {cause = new ClosedChannelException();}currentWrite.fail(cause);}}}private void doFlush(final ChannelHandlerContext ctx) {final Channel channel = ctx.channel();// 如果通道不活跃(比如已关闭),调用 discard(null) 清空队列,释放资源,// 随后调用 ctx.flush() 以确保之前写过但未刷新的数据也被处理,最后直接返回。if (!channel.isActive()) {discard(null);ctx.flush();return;}// 如果待写队列为空,直接调用 flush(),然后返回。if (queueIsEmpty()) {ctx.flush();return;}// 标记是否最终需要调用 flush()。boolean requiresFlush = true;// 获取缓冲区分配器,用于分配内存。BufferAllocator allocator = ctx.bufferAllocator();// 只要通道可写(写缓冲区未满),就尝试写数据。防止写入过快导致拥塞。while (channel.isWritable()) {// 从队列头获取当前待写项(不移除)final PendingWrite currentWrite = queue.peek();if (currentWrite == null) {break;}// 如果当前待写的 promise 已经完成(可能之前写失败了),直接移除该项并继续处理下一个。if (currentWrite.promise.isDone()) {queue.remove();continue;}final Object pendingMessage = currentWrite.msg;// 判断当前待写消息是否是 ChunkedInput 类型(分块输入流)。if (pendingMessage instanceof ChunkedInput) {final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;boolean endOfInput;boolean suspend;Object message = null;try {// 从 ChunkedInput 中读取下一块数据,分配内存。message = chunks.readChunk(allocator);// 判断是否读取到输入末尾。endOfInput = chunks.isEndOfInput();// suspend 标记如果当前块是 null 且未到末尾,说明暂时没有数据可写,需要挂起等待更多数据。suspend = message == null && !endOfInput;} catch (final Throwable t) {// 如果读取过程异常,清理资源,调用失败回调,并跳出循环。queue.remove();if (message != null) {Resource.dispose(message);}closeInput(chunks);currentWrite.fail(t);break;}// 如果需要挂起等待数据,则退出写入循环。if (suspend) {break;}// 如果块数据为空(null),则分配一个空缓冲区写入,防止写空消息时出现问题。if (message == null) {message = allocator.allocate(0);}// 如果已到输入末尾,移除队列当前项。if (endOfInput) {queue.remove();}// 写入当前块并立即刷新。Future<Void> f = ctx.writeAndFlush(message);// 如果已经到末尾,写完成后调用 handleEndOfInputFuture 处理关闭输入流、通知完成等。if (endOfInput) {if (f.isDone()) {handleEndOfInputFuture(f, chunks, currentWrite);} else {f.addListener(future -> handleEndOfInputFuture(future, chunks, currentWrite));}} else {// 如果未到末尾,调用 handleFuture 处理写入完成后的逻辑,比如继续写或暂停写。final boolean resume = !channel.isWritable();if (f.isDone()) {handleFuture(channel, f, chunks, currentWrite, resume);} else {f.addListener(future -> handleFuture(channel, future, chunks, currentWrite, resume));}}// 由于已经调用了 writeAndFlush,此时不需要额外再调用 flush()。requiresFlush = false;} else {// 非 ChunkedInput 处理// 对于非分块输入,直接从队列移除,调用 write() 异步写入,并将结果与当前 promise 关联。// 标记需要最后调用 flush()。queue.remove();ctx.write(pendingMessage).cascadeTo(currentWrite.promise);requiresFlush = true;}// 每写完一个任务后检测通道状态,若关闭,则调用 discard 清理剩余队列,退出循环。if (!channel.isActive()) {discard(new ClosedChannelException());break;}}// 如果循环中没主动调用 flush(),则最后统一调用。if (requiresFlush) {ctx.flush();}}// 在最后一个 chunk 写完后调用: 处理 ChunkedInput 写入完成后的清理逻辑private static void handleEndOfInputFuture(Future<?> future, ChunkedInput<?> input, PendingWrite currentWrite) {closeInput(input);if (future.isFailed()) {currentWrite.fail(future.cause());} else {currentWrite.success();}}// 处理 非末尾 chunk 写入完成后的回调逻辑, 根据是否写入成功、是否需要继续写,来决定是否恢复 chunk 的发送private void handleFuture(Channel channel, Future<?> future, ChunkedInput<?> input,PendingWrite currentWrite, boolean resume) {if (future.isFailed()) {closeInput(input);currentWrite.fail(future.cause());} else {if (resume && channel.isWritable()) {resumeTransfer();}}}// 关闭分块输入流,捕获并记录异常。private static void closeInput(ChunkedInput<?> chunks) {try {chunks.close();} catch (Throwable t) {logger.warn("Failed to close a ChunkedInput.", t);}}}
ChunkedWriteHandler.PendingWrite
PendingWrite 是一个写任务的封装器,绑定了待写入数据和写完成通知,是实现异步分块写入的基础数据结构。
private static final class PendingWrite {final Object msg;final Promise<Void> promise;PendingWrite(Object msg, Promise<Void> promise) {this.msg = msg;this.promise = promise;}void fail(Throwable cause) {promise.tryFailure(cause);if (Resource.isAccessible(msg, false)) {SilentDispose.dispose(msg, logger);}}void success() {if (promise.isDone()) {return;}promise.trySuccess(null);}
}