[netty5: ChunkedInput ChunkedWriteHandler]-源码分析

ChunkedInput

ChunkedInput<B> 是 Netty 中用于按块读取不定长数据流的接口,常配合 ChunkedWriteHandler 实现流式写入,支持如文件、流、HTTP 和 WebSocket 等多种数据源。

实现类简要说明
ChunkedFile用于将常规文件按块传输(使用传统阻塞 IO)。
ChunkedNioFile用于将 FileChannel 形式的文件通过 NIO 按块传输。
ChunkedNioStreamReadableByteChannel 数据源作为块状输入,适用于 NIO 输入流。
ChunkedStreamInputStream(阻塞流)按块读取并传输。
Http2DataChunkedInput专为 HTTP/2 数据帧的按块输入设计,用于发送 DATA 帧。
HttpChunkedInputHttpContent 对象(如文件块)封装为支持 trailer 的块状 HTTP 输出。
WebSocketChunkedInput用于 WebSocket 数据帧分块传输,支持大帧拆分成多个 WebSocket 帧。
public interface ChunkedInput<B> extends AutoCloseable {// 是否已经读取到输入流末尾boolean isEndOfInput() throws Exception;// 从数据流中读取下一段数据块(chunk)B readChunk(BufferAllocator allocator) throws Exception;// 返回整个输入源的长度(如果已知)long length();// 返回目前已经“传输”的字节数long progress();
}

ChunkedWriteHandler

ChunkedWriteHandler 是 Netty 中用于分块写入大数据流(如文件、视频流等)的处理器,核心职责是将大数据拆成小块逐步异步写入,避免一次性占用大量内存,提高传输效率和系统稳定性。主要特点和功能包括:

  1. 分块写入:支持 ChunkedInput 类型的数据流,按块读取并写入,适合无法一次性全部加载到内存的大数据。
  2. 异步处理:内部维护一个待写队列(PendingWrite),通过事件驱动机制逐块写出数据,保证非阻塞的高效传输。
  3. 资源管理:在写入完成或异常关闭时,会自动关闭数据流,释放资源,防止内存泄漏。
  4. 错误处理:遇到写入失败或通道关闭时,能正确通知每个待写任务失败,并清理队列。
  5. 流控制:自动管理写请求,避免写入过快导致拥塞,通过事件循环调度写操作。
public class ChunkedWriteHandler implements ChannelHandler {private static final Logger logger = LoggerFactory.getLogger(ChunkedWriteHandler.class);private Queue<PendingWrite> queue;private volatile ChannelHandlerContext ctx;public ChunkedWriteHandler() {}private void allocateQueue() {if (queue == null) {queue = new ArrayDeque<>();}}private boolean queueIsEmpty() {return queue == null || queue.isEmpty();}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {this.ctx = ctx;}public void resumeTransfer() {final ChannelHandlerContext ctx = this.ctx;if (ctx == null) {return;}if (ctx.executor().inEventLoop()) {resumeTransfer0(ctx);} else {ctx.executor().execute(() -> resumeTransfer0(ctx));}}private void resumeTransfer0(ChannelHandlerContext ctx) {try {doFlush(ctx);} catch (Exception e) {logger.warn("Unexpected exception while sending chunks.", e);}}// 如果当前有待写队列(queue)不为空,或者写入的消息是 ChunkedInput(分块数据流),则将写操作封装为 PendingWrite 并加入队列,返回对应的 Future(异步写结果)。// 否则直接调用下游的 ctx.write(msg)。@Overridepublic Future<Void> write(ChannelHandlerContext ctx, Object msg) {if (!queueIsEmpty() || msg instanceof ChunkedInput) {allocateQueue();Promise<Void> promise = ctx.newPromise();queue.add(new PendingWrite(msg, promise));return promise.asFuture();} else {return ctx.write(msg);}}@Overridepublic void flush(ChannelHandlerContext ctx) {doFlush(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {doFlush(ctx);ctx.fireChannelInactive();}// 实现基于通道写缓冲区状态的流控,防止写过快导致内存溢出@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isWritable()) {doFlush(ctx);}ctx.fireChannelWritabilityChanged();}// 从 queue 中依次取出 PendingWrite,若是 ChunkedInput 则判断是否写完,关闭资源,并根据情况调用 fail() 或 success()。private void discard(Throwable cause) {if (queueIsEmpty()) {return;}for (;;) {PendingWrite currentWrite = queue.poll();if (currentWrite == null) {break;}Object message = currentWrite.msg;if (message instanceof ChunkedInput) {ChunkedInput<?> in = (ChunkedInput<?>) message;boolean endOfInput;try {endOfInput = in.isEndOfInput();closeInput(in);} catch (Exception e) {closeInput(in);currentWrite.fail(e);logger.warn("ChunkedInput failed", e);continue;}if (!endOfInput) {if (cause == null) {cause = new ClosedChannelException();}currentWrite.fail(cause);} else {currentWrite.success();}} else {if (cause == null) {cause = new ClosedChannelException();}currentWrite.fail(cause);}}}private void doFlush(final ChannelHandlerContext ctx) {final Channel channel = ctx.channel();// 如果通道不活跃(比如已关闭),调用 discard(null) 清空队列,释放资源,// 随后调用 ctx.flush() 以确保之前写过但未刷新的数据也被处理,最后直接返回。if (!channel.isActive()) {discard(null);ctx.flush();return;}// 如果待写队列为空,直接调用 flush(),然后返回。if (queueIsEmpty()) {ctx.flush();return;}// 标记是否最终需要调用 flush()。boolean requiresFlush = true;// 获取缓冲区分配器,用于分配内存。BufferAllocator allocator = ctx.bufferAllocator();// 只要通道可写(写缓冲区未满),就尝试写数据。防止写入过快导致拥塞。while (channel.isWritable()) {// 从队列头获取当前待写项(不移除)final PendingWrite currentWrite = queue.peek();if (currentWrite == null) {break;}// 如果当前待写的 promise 已经完成(可能之前写失败了),直接移除该项并继续处理下一个。if (currentWrite.promise.isDone()) {queue.remove();continue;}final Object pendingMessage = currentWrite.msg;// 判断当前待写消息是否是 ChunkedInput 类型(分块输入流)。if (pendingMessage instanceof ChunkedInput) {final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;boolean endOfInput;boolean suspend;Object message = null;try {// 从 ChunkedInput 中读取下一块数据,分配内存。message = chunks.readChunk(allocator);// 判断是否读取到输入末尾。endOfInput = chunks.isEndOfInput();// suspend 标记如果当前块是 null 且未到末尾,说明暂时没有数据可写,需要挂起等待更多数据。suspend = message == null && !endOfInput;} catch (final Throwable t) {// 如果读取过程异常,清理资源,调用失败回调,并跳出循环。queue.remove();if (message != null) {Resource.dispose(message);}closeInput(chunks);currentWrite.fail(t);break;}// 如果需要挂起等待数据,则退出写入循环。if (suspend) {break;}// 如果块数据为空(null),则分配一个空缓冲区写入,防止写空消息时出现问题。if (message == null) {message = allocator.allocate(0);}// 如果已到输入末尾,移除队列当前项。if (endOfInput) {queue.remove();}// 写入当前块并立即刷新。Future<Void> f = ctx.writeAndFlush(message);// 如果已经到末尾,写完成后调用 handleEndOfInputFuture 处理关闭输入流、通知完成等。if (endOfInput) {if (f.isDone()) {handleEndOfInputFuture(f, chunks, currentWrite);} else {f.addListener(future -> handleEndOfInputFuture(future, chunks, currentWrite));}} else {// 如果未到末尾,调用 handleFuture 处理写入完成后的逻辑,比如继续写或暂停写。final boolean resume = !channel.isWritable();if (f.isDone()) {handleFuture(channel, f, chunks, currentWrite, resume);} else {f.addListener(future -> handleFuture(channel, future, chunks, currentWrite, resume));}}// 由于已经调用了 writeAndFlush,此时不需要额外再调用 flush()。requiresFlush = false;} else {// 非 ChunkedInput 处理// 对于非分块输入,直接从队列移除,调用 write() 异步写入,并将结果与当前 promise 关联。// 标记需要最后调用 flush()。queue.remove();ctx.write(pendingMessage).cascadeTo(currentWrite.promise);requiresFlush = true;}// 每写完一个任务后检测通道状态,若关闭,则调用 discard 清理剩余队列,退出循环。if (!channel.isActive()) {discard(new ClosedChannelException());break;}}// 如果循环中没主动调用 flush(),则最后统一调用。if (requiresFlush) {ctx.flush();}}// 在最后一个 chunk 写完后调用: 处理 ChunkedInput 写入完成后的清理逻辑private static void handleEndOfInputFuture(Future<?> future, ChunkedInput<?> input, PendingWrite currentWrite) {closeInput(input);if (future.isFailed()) {currentWrite.fail(future.cause());} else {currentWrite.success();}}// 处理 非末尾 chunk 写入完成后的回调逻辑, 根据是否写入成功、是否需要继续写,来决定是否恢复 chunk 的发送private void handleFuture(Channel channel, Future<?> future, ChunkedInput<?> input,PendingWrite currentWrite, boolean resume) {if (future.isFailed()) {closeInput(input);currentWrite.fail(future.cause());} else {if (resume && channel.isWritable()) {resumeTransfer();}}}// 关闭分块输入流,捕获并记录异常。private static void closeInput(ChunkedInput<?> chunks) {try {chunks.close();} catch (Throwable t) {logger.warn("Failed to close a ChunkedInput.", t);}}}

ChunkedWriteHandler.PendingWrite

PendingWrite 是一个写任务的封装器,绑定了待写入数据和写完成通知,是实现异步分块写入的基础数据结构。

private static final class PendingWrite {final Object msg;final Promise<Void> promise;PendingWrite(Object msg, Promise<Void> promise) {this.msg = msg;this.promise = promise;}void fail(Throwable cause) {promise.tryFailure(cause);if (Resource.isAccessible(msg, false)) {SilentDispose.dispose(msg, logger);}}void success() {if (promise.isDone()) {return;}promise.trySuccess(null);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/87804.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/87804.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

QT 第十二讲 --- 控件篇 LineEdit,TextEdit与ComboBox

前言&#xff1a;欢迎进入 QT 控件世界的第十二讲&#xff01;在上一讲《QT 第十一讲 --- 控件篇 LCDnumber&#xff0c;ProgressBar与CalenderWidget》中&#xff0c;我们探索了用于信息展示和状态反馈的控件&#xff1a;精准的数字显示器 LCD Number、直观的进度指示器 Progr…

VSCode遇到的一些小毛病(自动保存、运行后光标不再处于编辑区)

1. 右键点击Run Code没有触发自动保存 1. 打开 VS Code 设置&#xff08;Ctrl ,&#xff09; 2. 搜索&#xff1a;code runner save 3. 勾选你需要的 2. 运行后光标仍然处于编辑区&#xff08;容易误输入&#xff09; 1. 打开 VS Code 设置&#xff08;Ctrl ,&#xff09; 2.…

Maixcam的使用2

1.单文件和项目&#xff08;多个 py 文件项目/模块化&#xff09;# 在编写代码时&#xff0c;一般两种模式&#xff0c;执行单个文件&#xff0c;或者执行一个完成项目&#xff08;包含多个 py 文件或者其它资源文件&#xff09;。 单文件模式&#xff1a;MaixVision 创建或者…

征信系统架构思想:打造商业信任基石_东方仙盟—仙盟创梦IDE

一、建设必要性在复杂的商业环境中&#xff0c;企业面临多元交易对象与业务场景&#xff0c;准确评估合作方信用状况及潜在价值的难度显著增加。传统经验判断和简单背景调查存在局限性&#xff0c;难以满足现代商业决策需求&#xff0c;因此构建科学的征信体系具有现实必要性。…

网安-XSS-pikachu

介绍 XSS&#xff0c;即跨站脚本攻击&#xff0c;是指攻击者利用Web服务器中的代码漏洞&#xff0c;在页面中嵌入客户端脚本&#xff08;通常是一段由JavaScript编写的恶意代码&#xff09;&#xff0c;当信任此Web服务器的用户访问 Web站点中含有恶意脚本代码的页面&#xff…

算法入门——字典树(C++实现详解)

字典树&#xff08;Trie&#xff09;是处理字符串匹配的高效数据结构&#xff0c;广泛应用于搜索提示、拼写检查等场景。本文将带你从零掌握字典树的原理与实现&#xff01; 一、什么是字典树&#xff1f; 字典树&#xff08;Trie&#xff09;是一种树形数据结构&#xff0c;…

SpringBoot整合SpringCache缓存

SpringBoot整合SpringCache使用缓存 文章目录SpringBoot整合SpringCache使用缓存1.介绍2.SpringBoot整合1.导入xml依赖2.配置yml3.使用EnableCaching启用SpringCache4.Cacheable5.CachePut6.CacheEvict7. Caching8.CacheConfig3.其他属性配置1.keyGenerator 属性2. cacheManage…

WPF学习笔记(20)Button与控件模板

Button与控件模板一、 Button默认控件模板详解二、自定义按钮模板一、 Button默认控件模板详解 WPF 中的大多数控件都有默认的控件模板。 这些模板定义了控件的默认外观和行为&#xff0c;包括控件的布局、背景、前景、边框、内容等。 官方文档&#xff1a;https://learn.mic…

蓝天居士自传(1)

蓝天居士何许人&#xff1f; 蓝天居士是我的笔名&#xff0c;也可以说是号。就好像李白号青莲居士、欧阳修号六一居士一样。笔者本名彭昊 —— 一个有不少重名重姓者的名字。 笔者小的时候上语文课&#xff0c;无论是小学、初中抑或是高中&#xff0c;都会有鲁迅&#xff08;…

短剧系统开发定制全流程解析:从需求分析到上线的专业指南

一、短剧行业数字化趋势与系统开发必要性在短视频内容爆发式增长的时代背景下&#xff0c;短剧作为一种新兴的内容形式正在迅速崛起。数据显示&#xff0c;2023年中国短剧市场规模已突破300亿元&#xff0c;用户规模达到4.5亿&#xff0c;年增长率超过200%。这一迅猛发展的市场…

getBoundingClientRect() 详解:精准获取元素位置和尺寸

getBoundingClientRect() 是 JavaScript 中一个强大的 DOM API&#xff0c;用于获取元素在视口中的精确位置和尺寸信息。它返回一个 DOMRect 对象&#xff0c;包含元素的坐标、宽度和高度等关键几何信息。 基本用法 const element document.getElementById(myElement); cons…

EXCEL 基础技巧

来源&#xff1a;WPS 官网 初步了解WPS表格-WPS学堂https://www.wps.cn/learning/course/detail/id/635.html 1、格式刷 1.1使用格式刷隔行填充颜色。 首先设置部分表格颜色&#xff0c;选中此区域&#xff0c;双击点击格式刷&#xff0c;然后选中其他表格区域。 这样就可以…

【RK3568 编译rtl8723DU驱动】

RK3568 编译rtl8723DU驱动 编译源码1.解压rtl8723du2.修改Makefile 验证1.加载模块2.开启wifi 在驱动开发中&#xff0c;驱动的编译与集成是实现设备功能的关键环节。本文聚焦于基于 RK3568 处理器平台编译 RTL8723DU WiFi/BT 二合一模块驱动的完整流程&#xff0c;涵盖源码编译…

基于Simulink的二关节机器人独立PD控制仿真

文章目录 理论模型仿真窗口控制函数目标函数仿真 本文是刘金琨. 机器人控制系统的设计与MATLAB仿真的学习笔记。 理论模型 对于二关节机器人系统&#xff0c;其动力学模型为 D ( q ) q C ( q , q ˙ ) q ˙ r D(q)\ddot qC(q,\dot q)\dot q r D(q)q​C(q,q˙​)q˙​r 式…

【技术架构解析】国产化双复旦微FPGA+飞腾D2000核心板架构

本文就一款基于飞腾D2000核心板与两片高性能FPGA的国产化开发主板进行技术解析&#xff0c;包括系统架构、主要硬件模块、关键接口及软件环境&#xff0c;重点阐述各子系统间的数据路径与协同工作方式&#xff0c;旨在为行业内同类产品设计与应用提供参考。 随着国产化要求的加…

Python 数据分析:计算,分组统计1,df.groupby()。听故事学知识点怎么这么容易?

目录1 示例代码2 欢迎纠错3 论文写作/Python 学习智能体1 示例代码 直接上代码。 def grpby1():xls "book.xls"df pd.DataFrame(pd.read_excel(xls, engine"xlrd"))print(df)"""序号 分类 销量0 1 文学 51 2 计算机…

【解决“此扩展可能损坏”】Edge浏览器(chrome系列通杀))扩展损坏?一招保留数据快速修复

引言 如果你想保留你的数据&#xff0c;敲重点&#xff1a;不要点击修复&#xff0c;不要修复&#xff0c;不要修复 在使用 Microsoft Edge 浏览器时&#xff0c;您可能会遇到扩展程序显示“此扩展程序可能已损坏”的提示&#xff0c;且启用按钮无法点击。这一问题让许多用户感…

AI专业化应用加速落地,安全治理挑战同步凸显

7月2日&#xff0c;2025全球数字经济大会在北京国家会议中心开幕。本届大会以“建设数字友好城市”为主题&#xff0c;聚焦数字技术对城市发展的影响。开幕式上&#xff0c;一首完全由AI生成的MV成为焦点——从歌词、谱曲、演唱到视频制作全流程AI生成&#xff0c;展现人工智能…

Python统一调用多家大模型API指南

随着大模型技术的快速发展&#xff0c;市场上出现了越来越多的LLM服务提供商&#xff0c;包括OpenAI、Anthropic、Google、百度、阿里云等。作为开发者&#xff0c;我们经常需要在不同的模型之间切换&#xff0c;或者同时使用多个模型来满足不同的业务需求。本文将详细介绍如何…

【ESP32】1.编译、烧录、创建工程

标题打开一个Hello world工程并烧录 点击环境搭建链接 遇到的问题&#xff1a; 1.ESP32在VSCODE中烧录代码时&#xff0c;跳出窗口&#xff0c;OPenOCD is not running ,do you want to launch it? 可能是OCD没安装&#xff0c;重新安装 ESP-IDF试一下&#xff0c;在终端命令窗…