SpringBatch配置与入门实例

通过对SpringBatch基础概念的了解,参考:SpringBatch使用介绍
任何技术用起来之后,再去探究内部细节的原理,才会事半功倍。下面记录一下笔者在SpringBoot项目中集成SpringBatch,并且通过一个小的实例展示如何简单使用它进行批处理。

1、依赖与配置

pom依赖

需要引入以下依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId>
</dependency>

我这里用的SpringBoot版本是2.7.18,引入spring-boot-starter-batch时不需要再指定版本号,他会自动匹配对应版本,拉去maven依赖。

基础配置

需要新建配置类SpringBatchConfig,用于配置基础的JobRepository 、JobLauncher、JobBuilderFactory 、StepBuilderFactory 等 ,虽然官方文档说可以自动配置,但实际在项目中自定义配置可控性更强一些。

@Configuration
@EnableBatchProcessing // 开启SpringBatch
public class SpringBatchConfig {/*** 创建JobRepository. 用于存储SpringBatch job运行的元信息** @param dataSource         数据源,这里自动注入的是MySQL的数据源* @param transactionManager 事务管理器* @return JobRepository* @throws Exception 异常*/@Beanpublic JobRepository createJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();// 支持多种数据源类型,如:MySQL、H2、DB2、Oracle等jobRepositoryFactoryBean.setDatabaseType("MySQL");jobRepositoryFactoryBean.setTransactionManager(transactionManager);jobRepositoryFactoryBean.setDataSource(dataSource);return jobRepositoryFactoryBean.getObject();}/*** 创建JobLauncher用于启动Job** @return JobLauncher*/@Beanpublic SimpleJobLauncher createJobLauncher(JobRepository jobRepository) {SimpleJobLauncher jobLauncher = new SimpleJobLauncher();jobLauncher.setJobRepository(jobRepository);return jobLauncher;}/*** 创建JobBuilderFactory,用于创建Job** @param jobRepository JobRepository* @return JobBuilderFactory*/@Bean@Primarypublic JobBuilderFactory createJobBuilderFactory(JobRepository jobRepository) {return new JobBuilderFactory(jobRepository) {@Overridepublic JobBuilder get(String name) {return super.get(name).listener(createTimeWatchJobListener());}};}/*** 创建Job种的StepBuilderFactory,用于创建Step** @param jobRepository      JobRepository* @param transactionManager 事务管理器* @return StepBuilderFactory*/@Bean@Primarypublic StepBuilderFactory createStepBuilderFactory(JobRepository jobRepository, PlatformTransactionManager transactionManager) {return new StepBuilderFactory(jobRepository, transactionManager);}/*** 注册job监听器用于监听Job执行时间** @return TimeWatchJobListener*/@Beanpublic TimeWatchJobListener createTimeWatchJobListener() {return new TimeWatchJobListener();}
}

TimeWatchJobListener是一个自定义的Job运行监听器,这里只是监听JOB运行的时间,提示开始和结束

@Slf4j
public class TimeWatchJobListener implements JobExecutionListener {private static final Map<Long, StopWatch> STOP_WATCH_MAP = new ConcurrentHashMap<>();@Overridepublic void beforeJob(JobExecution jobExecution) {StopWatch stopWatch = new StopWatch();stopWatch.start();STOP_WATCH_MAP.put(jobExecution.getJobId(), stopWatch);log.info("job start, jobId={}", jobExecution.getJobId());}@Overridepublic void afterJob(JobExecution jobExecution) {StopWatch stopWatch = STOP_WATCH_MAP.get(jobExecution.getJobId());if (Objects.nonNull(stopWatch)) {stopWatch.stop();double seconds = stopWatch.getTotalTimeSeconds();log.info("job end, time cost={}s for jobId={}", seconds, jobExecution.getJobId());STOP_WATCH_MAP.remove(jobExecution.getJobId());}log.info("job end, jobId={}", jobExecution.getJobId());}
}

配置文件中主要配置启用springbatch和自动建框架表

spring:application:name: springbatch-learning# 数据源datasource:name: local-dstype: com.alibaba.druid.pool.DruidDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/xxx?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&serverTimezone=GMT%2B8&useSSL=falseusername: xxxxpassword: xxxxdruid:  #druid相关配置filters: stat #监控统计拦截的filtersinitial-size: 1  #配置初始化大小/最小/最大min-idle: 1max-active: 20max-wait: 60000 #获取连接等待超时时间time-between-eviction-runs-millis: 60000  #间隔多久进行一次检测,检测需要关闭的空闲连接min-evictable-idle-time-millis: 300000  #一个连接在池中最小生存的时间validation-query: SELECT 'x'test-while-idle: truetest-on-borrow: falsetest-on-return: falsepool-prepared-statements: false #打开PSCache,并指定每个连接上PSCache的大小。oracle设为true,mysql设为false。分库分表较多推荐设置为falsemax-pool-prepared-statement-per-connection-size: 20batch:jdbc:initialize-schema: always  # 自动创建Spring Batch的表结构job:enabled: false

上文中配置了JobRepository数据持久化为MySQL数据库,并且配置了属性spring.batch.jdbc.initialize-schema=always,则首次启动应用会在数据库新建SpringBatch的框架表,表之间的ER图如下:
在这里插入图片描述

2、入门实例

业务背景

用户下单之后,每天定时从订单表中获取待发货数据,批处理之后生成发货信息,并且更新订单表的状态。每天数据量50W

实践

定义数据读取器DataReader

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemReader;import java.util.Iterator;
import java.util.List;@Slf4j
public class DataReader implements ItemReader<OrderPo> {// 游标记录上个批次读取的数据private long lastRowId = 0;// 单次数据库读取的数据量private final int batchSize;// 数据缓存迭代器private Iterator<OrderPo> cacheIterator;public DataReader(int batchSize) {this.batchSize = batchSize;}@Overridepublic OrderPo read() throws Exception {if (cacheIterator == null || !cacheIterator.hasNext()) {// 使用游标方式批量查询数据库OrderMapper orderMapper = AppContextUtil.getBean(OrderMapper.class);QueryWrapper<OrderPo> cond = new QueryWrapper<>();cond.eq("status", 3);cond.gt("row_id", lastRowId);cond.last("limit " + batchSize);List<OrderPo> batchList = orderMapper.selectList(cond);if (batchList == null || batchList.isEmpty()) {return null; // 读取结束}// 更新lastRowId为当前批次最后一条的rowIdlastRowId = Long.parseLong(batchList.get(batchList.size() - 1).getRowId());log.info("lastRowId={},read data size={}", lastRowId, batchList.size());cacheIterator = batchList.iterator();}// 迭代器返回一条数据return cacheIterator.next();}
}

定义数据处理器,用于构造发货数据

import org.springframework.batch.item.ItemProcessor;public class DataProcessor implements ItemProcessor<OrderPo, OrderDelivery> {@Overridepublic OrderDelivery process(OrderPo item) throws Exception {// 处理itemitem.setStatus(4);DeliveryPo deliveryPo = new DeliveryPo();deliveryPo.setDeliveryId(SnowflakeIdGenerator.generatedIdStr());deliveryPo.setStatus(1);deliveryPo.setReceiveInfo("收件人:张三");deliveryPo.setSendInfo("发件人:李四");deliveryPo.fillCreateInfo();item.setDeliveryId(deliveryPo.getDeliveryId());return OrderDelivery.builder().orderPo(item).deliveryPo(deliveryPo).build();}
}

定义数据写入器

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemWriter;import java.util.List;
import java.util.stream.Collectors;@Slf4j
public class DataWriter implements ItemWriter<OrderDelivery> {@Overridepublic void write(List<? extends OrderDelivery> items) throws Exception {OrderMapper orderMapper = AppContextUtil.getBean(OrderMapper.class);DeliveryMapper deliveryMapper = AppContextUtil.getBean(DeliveryMapper.class);List<OrderPo> orderPoList = items.stream().map(OrderDelivery::getOrderPo).collect(Collectors.toList());List<DeliveryPo> deliveryPoList = items.stream().map(OrderDelivery::getDeliveryPo).collect(Collectors.toList());deliveryMapper.insertBatchSomeColumn(deliveryPoList);orderPoList.forEach(orderMapper::updateById);log.info("write deliveryPoList.size={},orderPoList.size={}", deliveryPoList.size(), orderPoList.size());}
}

创建任务OrderJob ,主要用于构建Job和对应的Step,在Step中对上述定义的读取、处理、写入进行组合

@Component
public class OrderJob {@Autowiredprivate JobBuilderFactory jobBuilderFactory;  // 用于创建Job@Autowiredprivate StepBuilderFactory stepBuilderFactory;  // 用于创建Steppublic Job createJob() {return jobBuilderFactory.get("order-delivery-"+ SnowflakeIdGenerator.generatedIdStr())  // 定义作业名称.start(orderDeliveryStep())  // 定义作业的起始步骤.build();}public Step orderDeliveryStep() {return stepBuilderFactory.get("orderDeliveryStep").<OrderPo, OrderDelivery>chunk(2000).reader(new DataReader(500)).processor(new DataProcessor()).writer(new DataWriter()).build();}
}

构建测试用例

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
@Slf4j
class JobCreatorTest {@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate OrderJob orderJob;@Testpublic void runJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException, InterruptedException {Job job = orderJob.createJob();jobLauncher.run(job,new JobParameters());Thread.sleep(30*1000);}
}
补充mybatis-plus相关类

新建表

drop table if exists order_t;
CREATE TABLE `order_t`
(`row_id`           varchar(32)    NOT NULL COMMENT 'row_id主键',`order_no`         varchar(64) DEFAULT NULL COMMENT '订单编号',`trade_date`       date           NOT NULL COMMENT '交易日期',`pay_money`        decimal(16, 3) NOT NULL COMMENT '支付金额',`status`           tinyint(4)  DEFAULT '1' COMMENT '状态,1:已提交,2:已付款,3:待发货,4:已发货,5:已收货,6:已完成',`delivery_id`      varchar(32) DEFAULT NULL COMMENT '物流ID',`create_by`        varchar(64) DEFAULT NULL COMMENT '创建人',`creation_date`    datetime    DEFAULT NULL COMMENT '创建时间',`last_update_by`   varchar(64) DEFAULT NULL COMMENT '修改人',`last_update_date` datetime    DEFAULT NULL COMMENT '修改时间',PRIMARY KEY (`row_id`),KEY `idx_orderDate` (`trade_date`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8 COMMENT ='订单表';drop table if exists delivery_t;
CREATE TABLE `delivery_t`
(`delivery_id`      varchar(32) NOT NULL COMMENT 'delivery_id主键',`receive_info`     varchar(512) DEFAULT NULL COMMENT '收货信息',`send_info`        varchar(512) DEFAULT NULL COMMENT '发货信息',`status`           tinyint(4)   DEFAULT '1' COMMENT '状态,1:揽件中,2:已发货,3:运输中,4:已收货,5:拒收',`create_by`        varchar(64)  DEFAULT NULL COMMENT '创建人',`creation_date`    datetime     DEFAULT NULL COMMENT '创建时间',`last_update_by`   varchar(64)  DEFAULT NULL COMMENT '修改人',`last_update_date` datetime     DEFAULT NULL COMMENT '修改时间',PRIMARY KEY (`delivery_id`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8 COMMENT ='发货表';

订单PO

@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("order_t")
public class OrderPo extends BasePo{private static final long serialVersionUID = 6895153403831336415L;@TableId("row_id")private String rowId;@TableField("order_no")private String orderNo;@TableField("trade_date")private Date tradeDate;@TableField("pay_money")private BigDecimal payMoney;/*** '状态,1:已提交,2:已付款,3:待发货,4:已发货,5:已收货,6:已完成'*/@TableField("status")private int status;@TableField("delivery_id")private String deliveryId;
}

发货PO

@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("delivery_t")
public class DeliveryPo extends BasePo{private static final long serialVersionUID = -3711161041800162057L;@TableId("delivery_id")private String deliveryId;@TableField("receive_info")private String receiveInfo;@TableField("send_info")private String sendInfo;/*** 状态,1:揽件中,2:已发货,3:运输中,4:已收货,5:拒收*/@TableField("status")private int status;
}

BasePo

@Data
@AllArgsConstructor
@NoArgsConstructor
public class BasePo implements Serializable {private static final long serialVersionUID = -1914144713260446591L;@TableField("create_by")private String createBy;@TableField("creation_date")private Date creationDate;@TableField("last_update_by")private String lastUpdateBy;@TableField("last_update_date")private Date lastUpdateDate;public void fillCreateInfo() {this.createBy="system";this.creationDate = new Date();this.lastUpdateBy = "system";this.lastUpdateDate = new Date();}public void fillUpdateInfo() {this.lastUpdateBy = "system";this.lastUpdateDate = new Date();}
}

mapper相关

public interface OrderMapper extends CommonMapper<OrderPo> {
}public interface DeliveryMapper extends CommonMapper<DeliveryPo> {
}public  interface CommonMapper<T> extends BaseMapper<T> {Integer insertBatchSomeColumn(List<T> list);
}

批量插入mybatis-plus配置

@Configuration
public class MyBatisPlusConfig {@Beanpublic DefaultSqlInjector insertBatchSqlInject() {return new DefaultSqlInjector() {@Overridepublic List<AbstractMethod> getMethodList(org.apache.ibatis.session.Configuration configuration, Class<?> mapperClass, TableInfo tableInfo) {List<AbstractMethod> methodList = super.getMethodList(configuration,mapperClass, tableInfo);methodList.add(new InsertBatchSomeColumn());return methodList;}};}
}

测试效果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/89842.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/89842.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

spdlog 项目介绍与二次封装

目录 介绍 二次封装 介绍 spdlog 是C开源的第三方日志库&#xff0c;整个项目在 spdlog 命名空间中。 在 spdlog 命名空间的 level 命名空间里定义了枚举类型&#xff0c;把日志分为了 5 个等级&#xff1a;trace debug info warn err critical enum level_enum : in…

shell编程之awk命令详解

1. awk 教程 1.1 调用 awk awk 是一种强大的文本处理工具&#xff0c;在 Linux 系统中广泛应用于日志分析、数据处理等场景。调用 awk 主要有以下三种方式&#xff1a; 1.1.1 命令行方式 基本语法为&#xff1a; awk (-F filed-separator) commands input-files其中&#…

服务器需要备案吗?在哪些地区需要备案?

&#x1f3af; 服务器是否需要备案&#xff1f; 是否需要备案&#xff0c;关键看以下两个因素&#xff1a; 服务器所在地&#xff08;机房位置&#xff09; 网站面向的访问群体&#xff08;境内或境外&#xff09; &#x1f3f7; 中国大陆&#xff08;境内&#xff09;服务器…

HarmonyOS学习3---ArkUI

1、组件 1.1、基础组件 1.2、布局容器 1.3、页面导航 1.4、其他组件 2、ArkTs/C混合开发&#xff0c;高性能编码 3、布局能力&交互归一 4、实时开发预览

Java学习第十五部分——MyBatis

目录 一.概述 二.特点 三.组件 四.Mapper 五.配置文件 六.使用步骤 七.高级功能 八.优点缺点 九.项目实战 1.打开idea创建一个Java项目&#xff0c;构建系统选“Maven”​ 2.创建完成后若依赖报错&#xff0c;可通过下载或重新加载来解决​ 3.配置pom.xml文件&…

小企业如何搭建本地私有云服务器,并设置内部网络地址提供互联网访问

在数字化时代&#xff0c;很多普通公司小企业规模的&#xff0c;利用本地小型服务器或计算机搭建私有云服务器&#xff0c;不仅可以提升数据管理效率&#xff0c;还能保障业务数据的安全性和灵活性。以下是为小企业量身定制的私有云服务器搭建指南&#xff0c;及最后附无公网IP…

MySQL 八股文【持续更新ing】

MySQL 八股文【持续更新ing】 文章目录 MySQL 八股文【持续更新ing】前言一、MySQL的存储引擎有哪些&#xff1f;他们之间有什么区别&#xff1f;二、MySQL InnoDB 引擎中的聚簇索引和非聚簇索引有什么区别&#xff1f;1.InnoDB 中的聚簇索引2.InnoDB 中的非聚簇索引 三、MySQL…

每日算法刷题Day42 7.5:leetcode前缀和3道题,用时2h

7. 3026.最大好子数组和(中等,学习) 3026. 最大好子数组和 - 力扣&#xff08;LeetCode&#xff09; 思想 1.给你一个长度为 n 的数组 nums 和一个 正 整数 k 。 如果 nums 的一个子数组中&#xff0c;第一个元素和最后一个元素 差的绝对值恰好 为 k &#xff0c;我们称这个…

Linux操作系统之文件(四):文件系统(上)

前言&#xff1a; 我们前几篇文章讲了缓冲区与重定向的有关概念&#xff0c;这些设计是linux系统的核心机制&#xff0c;对系统性能、资源管理和用户操作灵活性有重要意义。 不涉及一些硬件就不可能让大家清楚地去理解文件系统&#xff0c;所以这篇文章&#xff0c;我将会从计…

java中,stream的filter和list的removeIf筛选速度比较

在 Java 里&#xff0c;Stream 的filter和 List 的removeIf筛选效率要依据具体情形来判断。 1. 操作本质有别 Stream 的 filter&#xff1a; 它是一种中间操作&#xff0c;不会立刻执行&#xff0c;而是把筛选条件记录下来。只有遇到终端操作时&#xff0c;才会开始处理元素。…

Python(28)Python循环语句指南:从语法糖到CPython字节码的底层探秘

目录 引言一、推导式家族全解析1.1 基础语法对比1.2 性能对比测试 二、CPython实现揭秘2.1 字节码层面的秘密2.2 临时变量机制 三、高级特性实现3.1 嵌套推导式优化3.2 条件表达式处理 四、性能优化指南4.1 内存使用对比4.2 执行时间优化技巧 五、最佳实践建议六、总结&#x1…

深度分析:Microsoft .NET Framework System.Random 的 C++ 复刻实现

深度分析&#xff1a;Microsoft .NET Framework Random 的 C 复刻实现 核心原理与算法结构 本实现基于 Knuth 减随机数生成器&#xff08;Subtractive Random Number Generator&#xff09;&#xff0c;是 .NET Framework 中 System.Random 的精确复刻。其核心特点包括&#x…

[论文阅读] 人工智能 | 在非CUDA硬件上运行几何学习:基于Intel Gaudi-v2 HPU的PyTorch框架移植实践

在非CUDA硬件上运行几何学习&#xff1a;基于Intel Gaudi-v2 HPU的PyTorch框架移植实践 论文标题&#xff1a;PyTorch-based Geometric Learning with Non-CUDA Processing Units: Experiences from Intel Gaudi-v2 HPUs arXiv:2507.01031 (cross-list from cs.LG) PyTorch-ba…

Python-多线程-threading

1 需求 2 接口 3 示例 4 参考资料 Python treading 模块 | 菜鸟教程

2025年- H91-Lc199-- 62.不同路径(多维动态规划)--Java版

1.题目描述 2.思路 dp含义&#xff1a;代表到当前位置的路径数 递推公式&#xff1a;dp[i][j]dp[i-1][j]dp[i][j-1] dp数组初始化&#xff0c;我们要确保第一行和第一列是有值的. dp数组的遍历顺序&#xff1a;我们需要从左往右遍历&#xff0c;从上往下遍历。并且把第一行和第…

char 不是 Java 中的 2 字节(16 位)吗? 为什么用 UTF-8 编码写入时,一个中文要占 3 个字节?

char 不是 Java 中的 2 字节&#xff08;16 位&#xff09;吗&#xff1f; 为什么用 UTF-8 编码写入时&#xff0c;一个中文要占 3 个字节&#xff1f; ✅ 一、Java 中的 char 是什么&#xff1f; Java 的 char 是一个 固定大小的 2 字节&#xff08;16 位&#xff09;类型&am…

【Elasticsearch】检索排序 分页

检索排序 & 分页 1.测试数据准备2.排序功能2.1 简单字段排序2.2 多字段排序2.3 日期排序 3.分页功能3.1 基础分页3.2 深度分页&#xff08;不推荐大数据量使用&#xff09;3.3 使用 search_after 进行高效分页 4.综合示例&#xff1a;高亮排序分页5.实践建议 1.测试数据准备…

Delta、Jackknife、Bootstrap

用班级平均身高的案例&#xff0c;展示 ​Delta、Jackknife、Bootstrap​ 的完整计算过程。 ​0. 数据准备​ ​原始数据&#xff08;4个学生的身高&#xff09;​​&#xff1a; 真实均值&#xff08;目标统计量&#xff09;​​&#xff1a; ​1. Delta 方法&#xff08;公式…

企业智脑技术架构设计:紧贴企业场景规划面向未来的发展趋势与实现路径

摘要 本文深入探讨了企业智脑技术架构的设计理念与发展趋势&#xff0c;分析了当前企业智能化转型的技术需求与挑战&#xff0c;提出了一个面向未来的企业智脑技术架构设计方案。文章从底层技术支撑、核心能力构建、应用场景适配、安全合规保障以及未来发展路径五个维度展开论…

新手向:Python方向讲解

从NASA火星任务到TikTok推荐算法&#xff0c;从自动化脚本到量子计算&#xff0c;Python用import antigravity重新定义了编程边界 一、设计哲学&#xff1a;优雅明确的编程禅学 Python之禅&#xff08;import this&#xff09;&#xff1a; 优美胜于丑陋&#xff08;Beautifu…