SpringBatch处理数据性能优化

SpringBatch的Step默认使用同步方式批量处理数据,也可以通过配置将读数改为同步,处理和写入改为异步方式。

1、同步处理Step

SpringBatch的Step一般由ItemReader、ItemProcessor和ItemWriter组成,其中ItemProcessor是可选的。他的设计思路的通过ItemReader读取一条数据之后,汇总到inputs中,当达到chunkSize数量时,使用ItemProcessor处理数据,然后使用ItemWriter写入。

这个过程都是同步的操作,不存在异步的过程。实际业务处理过程中,数据一般来源于数据库,如果每次只读取一条数据,效率比较低,可以采用批量读取数据,单条返回的方式提高效率。如:DataReader通过游标批量读取数据

public class DataReader implements ItemReader<InputType> {// 游标记录上个批次读取的数据private long lastRowId = 0;// 单次数据库读取的数据量private int batchSize;// 数据缓存迭代器private Iterator<InputType> cacheIterator;public DataReader(int batchSize) {this.batchSize = batchSize;}@Overridepublic InputType read() throws Exception {if (cacheIterator == null || !cacheIterator.hasNext()) {// 使用游标方式批量查询数据库List<InputType> batchList = ....if (batchList == null || batchList.isEmpty()) {return null; // 读取结束}// 更新lastRowId为当前批次最后一条的rowIdlastRowId = batchList.get(batchList.size() - 1).getRowId();cacheIterator = batchList.iterator();}// 迭代器返回一条数据return cacheIterator.next();}
}

DataProcessor将读取的数据做业务处理,转化为OutputType类型数据传递给ItemWriter

public class DataProcessor implements ItemProcessor<InputType, OutputType> {@Overridepublic OutputType process(InputType item) throws Exception {// 处理item,转换为OutputTypereturn output;}
}

DataWriter中写入OutputType类型数据到数据库

public class DataWriter implements ItemWriter<OutputType> {@Overridepublic void write(List<? extends OutputType> items) throws Exception {// 写入数据都数据库。。。}
}

配置同步Step

return stepBuilderFactory.get("step1").<InputType, OutputType> chunk(100)  // 每100条数据为一个批次,执行processor和writer.reader(new DataReader(1000)) // 数据库每次读取1000条.processor(new DataProcessor()).writer(new DataWriter()).build();

2、异步处理Step

在大数据量的批处理系统中,希望尽可能地提高性能,这时可以将ItemProcossor和ItemWriter环节采用异步多线程的方式进行优化,这是需要将ItemProcossor和ItemWriter分别包装为AsyncItemProcessor和AsyncItemWriter,如下方法可以实现包装:

private <I, O> AsyncItemProcessor<I, O> wrapAsyncProcessor(ItemProcessor<I, O> processor,TaskExecutor taskExecutor) {AsyncItemProcessor<I, O> asyncItemProcessor = new AsyncItemProcessor<>();asyncItemProcessor.setDelegate(processor);asyncItemProcessor.setTaskExecutor(taskExecutor);return asyncItemProcessor;
}private <O> AsyncItemWriter<O> wrapAsyncWriter(ItemWriter<O> writer) {AsyncItemWriter<O> asyncItemWriter = new AsyncItemWriter<>();asyncItemWriter.setDelegate(writer);return asyncItemWriter;
}

配置异步Step

private Step step2() {AsyncItemProcessor<PayOrderPo, PayOrderPo> asyncItemProcessor =wrapAsyncProcessor(new DataProcessor(), getAsyncExecutor("TestJobPool"));AsyncItemWriter<PayOrderPo> asyncItemWriter = wrapAsyncWriter(new DataWriter());return stepBuilderFactory.get("step2").<PayOrderPo, Future<PayOrderPo>> chunk(500).reader(new DataReader(1000)).processor(asyncItemProcessor).writer(asyncItemWriter).build();
}
private TaskExecutor getAsyncExecutor(String threadPoolName) {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(4);executor.setMaxPoolSize(8);executor.setQueueCapacity(200);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix(threadPoolName + "-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setAllowCoreThreadTimeOut(true);executor.initialize();return executor;
}

AsyncItemProcessor使用了代理模式,内部代理到ItemProcessor进行实际数据处理,通过taskExecutor线程池异步高性能处理数据

public class AsyncItemProcessor<I, O> implements ItemProcessor<I, Future<O>>, InitializingBean {// 代理的ItemProcessorprivate ItemProcessor<I, O> delegate;private TaskExecutor taskExecutor = new SyncTaskExecutor();public void afterPropertiesSet() throws Exception {Assert.notNull(delegate, "The delegate must be set.");}@Nullablepublic Future<O> process(final I item) throws Exception {final StepExecution stepExecution = getStepExecution();FutureTask<O> task = new FutureTask<>(new Callable<O>() {public O call() throws Exception {if (stepExecution != null) {StepSynchronizationManager.register(stepExecution);}try {// 代理的processor实际处理数据return delegate.process(item);}finally {if (stepExecution != null) {StepSynchronizationManager.close();}}}});// 提交异步任务taskExecutor.execute(task);return task;}
}

处理的过程如下:

  1. 创建FutureTask,在其线程中实际调用 process(item) 方法进行数据处理。
  2. 通过 TaskExecutor 异步执行任务FutureTask
  3. 返回 Future 对象给Writer来跟踪异步结果。

AsyncItemWriter同样使用了代理模式,代理到实际处理数据的ItemWriter,主要通过两个步骤进行:

1、获取processor环境的异步处理结果

2、汇总结果到实际的ItemWriter进行数据写入

public class AsyncItemWriter<T> implements ItemStreamWriter<Future<T>>, InitializingBean {// 代理的ItemWriterprivate ItemWriter<T> delegate;public void write(List<? extends Future<T>> items) throws Exception {// 用于保存异步结果List<T> list = new ArrayList<>();// 获取异步结果for (Future<T> future : items) {try {T item = future.get();if(item != null) {list.add(future.get());}}catch (ExecutionException e) {Throwable cause = e.getCause();if(cause != null && cause instanceof Exception) {logger.debug("An exception was thrown while processing an item", e);throw (Exception) cause;}else {throw e;}}}// 代理到实际的ItemWriter进行数据写入delegate.write(list);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/86814.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/86814.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【机器学习深度学习】前馈神经网络(单隐藏层)

目录 一、什么是前馈神经网络&#xff1f; 二、数学表达式是什么&#xff1f; 三、为什么需要“非线性函数”&#xff1f; 四、NumPy 实现前馈神经网络代码示例 五、 运行结果 六、代码解析 6.1 初始化部分 6.2 前向传播 6.3 计算损失&#xff08;Loss&#xff09; 6…

设计模式系列(08):创建型模式 - 原型模式

系列导读&#xff1a;完成创建型模式的学习&#xff0c;我们来看最后一个创建型模式——原型模式。它通过复制已有对象来创建新对象&#xff0c;是一种独特的创建方式。 解决什么问题&#xff1a;通过复制现有对象来创建新对象&#xff0c;而不是重新实例化。适用于对象创建成本…

区块链到底是什么?

区块链本质上是一种去中心化的分布式账本技术&#xff0c;具有以下核心特点&#xff1a; - 去中心化&#xff1a;没有中央管理机构&#xff0c;数据由网络中的多个节点共同维护&#xff0c;比如比特币网络中各个节点都保存着完整账本。 - 分布式存储&#xff1a;数据不是存在一…

系统架构设计师论文分享-论ATAM的使用

我的软考历程 摘要 2023年2月&#xff0c;我司通过了研发纱线MES系统的立项&#xff0c;该系统为国内纱线工厂提供SAAS服务&#xff0c;旨在提高纱线工厂的数字化和智能化水平。我在本项目中担任系统架构设计师&#xff0c;负责整个项目的架构设计工作。本文结合我在该项目中…

vue-28(服务器端渲染(SSR)简介及其优势)

服务器端渲染&#xff08;SSR&#xff09;简介及其优势 服务器端渲染&#xff08;SSR&#xff09;是现代网络应用的关键技术&#xff0c;特别是使用 Vue.js 等框架构建的应用。它通过在服务器上渲染初始应用状态来弥补传统单页应用&#xff08;SPA&#xff09;的局限性&#x…

工业电子 | 什么是SerDes,为何工业和汽车应用需要它?

重点内容速览&#xff1a; 1. 什么是SerDes&#xff1f; 2. ADI&#xff1a;私有协议的GMSL将向公有协议转变 3. TI&#xff1a;工业和汽车有两套SerDes解决方案 4. Microchip&#xff1a;推出通用协议SerDes芯片 5. 罗姆&#xff1a;主要针对汽车领域 6. 国产SerDes芯…

大事件项目记录4-用户接口开发-更新用户基本信息

4&#xff09;更新用户基本信息。 UserController.java&#xff1a; UserMapper.java&#xff1a; Update("update user set nickname #{nickname},email #{email},update_time #{updateTime} where id #{id}")void update(User user); UserServiceInterface…

Transformer结构--输入编码(BPE,PE)

在Transformer结构中&#xff0c;输入编码是模型处理文本数据的关键步骤&#xff0c;其中**BPE&#xff08;Byte Pair Encoding&#xff0c;字节对编码&#xff09;和PE&#xff08;Positional Encoding&#xff0c;位置编码&#xff09;**是两种重要的编码方式&#xff0c;它们…

Confluence-测试用例设计指导方法

测试经验知识库 典型的测试场景验证点各个项目有价值的经验和测试点 测试经验知识库 - 草稿测试用例执行量化指导建议 何时需要进行全量测试和如何定义和执行测试用例量的一些建议和标准 端对端&#xff08;E2E&#xff09;测试用例设计指导方案 在测试行业中&#xff0c;端到端…

浅析JVM

一、JVM运行流程 如图&#xff1a; JVM由四个部分构成&#xff1a; 1.类加载器 加载类文件到内存2.运行时数据区 写的程序需要加载到这里才能运行3.执行引擎 负责解释命令&#xff0c;提交操作系统执行4.本地接口 融合不同编程语言为java所用&#xff0c;如Java程序驱动打印…

多个 Job 并发运行时共享配置文件导致上下文污染,固化 Jenkins Job 上下文

基于 context.py 固化 Jenkins Job 上下文的完整方案&#xff0c;适用于你当前的工作流&#xff08;Python Jenkins Pipeline&#xff09;&#xff0c;解决&#xff1a; 多个 Job 并发运行时共享配置文件导致上下文污染&#xff1b;读取环境变量或 JSON 文件时被其他 Job 修改…

简木易支付系统 功能齐全,对接接口超多

简木易支付系统&#xff0c;作为一款引领行业潮流的卓越支付解决方案&#xff0c;依托先进的 PHP MySQL 技术架构精心打造。在开发过程中&#xff0c;它巧妙运用了功能强大的 ThinkPHP8 框架&#xff0c;完美融合前端主流技术 Vue、Element 以及 Layuiadmin&#xff0c;共同铸…

【软考高项论文】信息系统项目的人力资源管理

摘要 本文围绕信息系统项目的人力资源管理展开论述。以我在2024年参与的为大型国有企业构建供应链管理系统项目为例&#xff0c;阐述了项目人力资源管理的主要流程&#xff0c;包括规划、组建、建设和管理团队四个过程&#xff0c;以及所运用的工具和理论。同时&#xff0c;分…

【EI会议征稿】东北大学主办第三届机器视觉、图像处理与影像技术国际会议(MVIPIT 2025)

一、会议信息 大会官网&#xff1a;www.mvipit.org 官方邮箱&#xff1a;mvipit163.com 会议地点&#xff1a;辽宁沈阳 主办单位&#xff1a;东北大学 会议时间&#xff1a;2025 年 9 月 27 日-9 月 29 日 二、征稿主题 集中但不限于“机器视觉、图像处理与影像技术”等其…

从零开始的云计算生活——第二十三天,稍作休息,Tomcat

目录 一.故事背景 二.Tomcat概述 1、Tomcat介绍 2、Tomcat历史 二、Tomcat原理分析 1、Http工作原理 2、Tomcat整体架构 3、Coyote连接器架构 4、Catalina容器架构 5、Jasper处理流程 6、JSP编译过程 7、Tomcat启动流程 8、Tomcat请求处理流程 三、Tomcat安装与配…

几种基于Doherty结构的GAN氮化镓功放设计方法介绍

功率放大器是现代无线通信系统中最重要的组件之一。理想情况下&#xff0c;它们能够以高线性度和高效率提供高输出功率。但通常在这三个关键的功率放大器性能参数之间需要进行权衡取舍&#xff0c;而且具有最高输出功率和线性度的放大器往往会牺牲效率。 在支持宽带宽和高数据…

前端打印计算单位 cm、mm、px

A4 纵向 宽&#xff1a;21cm&#xff0c;210mm&#xff0c;793.698px 高&#xff1a;29.7cm&#xff0c;297mm&#xff0c;1122.520px A4 横向 宽&#xff1a;29.7cm&#xff0c;297mm&#xff0c;1122.520px 高&#xff1a;21cm&#xff0c;210mm&#xff0c;793.698px …

c# sugersql 获取子表数据排序

在C#中使用Sugar ORM&#xff08;一个流行的.NET ORM框架&#xff09;获取子表数据并进行排序&#xff0c;可以通过以下几种方式实现&#xff1a; 1. 使用HasMany或HasOne配置 首先&#xff0c;确保你在配置实体时已经正确设置了HasMany或HasOne关系。例如&#xff0c;假设你…

【nRF52832】【环境搭建 3】【如何新建一个纯单片机开发的工程】

1. 前言 笨叔&#xff0c;又要开始扯淡了!!! 不感兴趣的同学&#xff0c;可以跳过了!!! 笨叔之前在大学里面&#xff0c; 刚接触单片机时。就被 windows 平台 例如 keill 5 、IAR 等一堆开会环境差点劝退。 当时也是坚持咬牙一点点摸索过来的。刚摸索明白&#xff0c;觉得单片…

Spring-loC与DI

目录 1 loC控制反转思想 2 DI依赖注入 3 loC详解 3.1 存储Bean &#xff08;1&#xff09;Controller &#xff08;2&#xff09;Service &#xff08;3&#xff09;Repository &#xff08;4&#xff09;Component &#xff08;5&#xff09;Configuration &#xf…