我们使用ElasticJob需要以下步骤: 1. 添加依赖 2. 配置任务(可以使用Spring命名空间配置或Java配置) 3. 实现任务逻辑(实现SimpleJob、DataflowJob等接口) 4. 启动任务 下面是一个详细的示例,包括Spring Boot集成ElasticJob的步骤。 注意:ElasticJob有两个版本:ElasticJob-Lite和ElasticJob-Cloud。我们这里使用ElasticJob-Lite(轻量级无中心化解决方案)。 步骤1:添加Maven依赖 在pom.xml中添加以下依赖:```xml<dependency> <groupId>org.apache.shardingsphere.elasticjob</groupId> <artifactId>elasticjob-lite-spring-boot-starter</artifactId> <version>3.0.1</version> <!-- 请使用最新版本 --></dependency>``` 步骤2:配置ElasticJob(这里使用Spring Boot的application.properties配置) 在application.properties中配置注册中心(Zookeeper)和任务参数:```properties# 注册中心配置elasticjob.reg-center.server-lists=localhost:2181elasticjob.reg-center.namespace=elasticjob-demo# 任务配置(这里配置一个简单任务)elasticjob.jobs.mySimpleJob.elastic-job-class=com.example.demo.job.MySimpleJobelasticjob.jobs.mySimpleJob.cron=0/5 * * * * ?elasticjob.jobs.mySimpleJob.sharding-total-count=3elasticjob.jobs.mySimpleJob.overwrite=true``` 步骤3:编写任务类(实现SimpleJob接口)```javapackage com.example.demo.job;import org.apache.shardingsphere.elasticjob.api.ShardingContext;import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;public class MySimpleJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { // 根据分片项执行任务 System.out.println("分片项: " + shardingContext.getShardingItem() + ", 总分片数: " + shardingContext.getShardingTotalCount() + ", 任务参数: " + shardingContext.getJobParameter()); // 实际业务逻辑 switch (shardingContext.getShardingItem()) { case 0: // 处理第一片数据 break; case 1: // 处理第二片数据 break; case 2: // 处理第三片数据 break; } }}``` 步骤4:启动Spring Boot应用 如果需要使用Java配置方式(代替properties配置),可以创建一个配置类:```javaimport org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class ElasticJobConfig { @Autowired private ZookeeperRegistryCenter regCenter; @Bean(initMethod = "schedule") public ScheduleJobBootstrap mySimpleJob(MySimpleJob mySimpleJob) { return new ScheduleJobBootstrap(regCenter, mySimpleJob, JobConfiguration.newBuilder("mySimpleJob", 3) // 任务名称和分片总数 .cron("0/5 * * * * ?") // cron表达式 .jobParameter("参数") // 任务参数 .overwrite(true) // 是否覆盖配置 .build()); }}``` 注意:如果使用Java配置,需要同时配置Zookeeper注册中心:```java@Beanpublic ZookeeperRegistryCenter regCenter() { // 配置Zookeeper ZookeeperConfiguration zkConfig = new ZookeeperConfiguration("localhost:2181", "elasticjob-demo"); return new ZookeeperRegistryCenter(zkConfig);}``` 步骤5:分片策略 ElasticJob可以将任务拆分成多个分片,每个分片由不同的服务器执行。你可以在任务中根据分片项处理不同的数据。 例如,处理数据库数据时,可以这样分片:```java@Overridepublic void execute(ShardingContext shardingContext) { // 获取分片项和总分片数 int shardingItem = shardingContext.getShardingItem(); int shardingTotalCount = shardingContext.getShardingTotalCount(); // 根据分片项查询数据(例如:按ID取模) List<Long> data = getDataFromDatabase(); for (Long id : data) { if (id % shardingTotalCount == shardingItem) { // 处理这个数据 process(id); } }}``` 步骤6:监听器(可选) 你可以添加任务监听器来监控任务执行情况:```javapublic class MyJobListener implements ElasticJobListener { @Override public void beforeJobExecuted(ShardingContexts shardingContexts) { System.out.println("任务开始"); } @Override public void afterJobExecuted(ShardingContexts shardingContexts) { System.out.println("任务结束"); }}``` 然后在任务配置中添加监听器:```java@Bean(initMethod = "schedule")public ScheduleJobBootstrap mySimpleJob(MySimpleJob mySimpleJob) { return new ScheduleJobBootstrap(regCenter, mySimpleJob, JobConfiguration.newBuilder("mySimpleJob", 3) .cron("0/5 * * * * ?") .jobListenerTypes("myJobListener") // 监听器名称(需要将监听器注册为Bean) .overwrite(true) .build());}// 注册监听器Bean@Beanpublic MyJobListener myJobListener() { return new MyJobListener();}``` 步骤7:错误处理 任务执行过程中出现异常,ElasticJob会进行重试(可配置)并记录错误日志。你也可以在任务中自己捕获异常处理。 总结: 使用ElasticJob的关键步骤: 1. 引入依赖 2. 配置注册中心(Zookeeper) 3. 定义任务(实现SimpleJob等接口) 4. 配置任务(cron表达式、分片数等) 5. 启动任务(通过Spring Boot自动启动或手动调度) 更多详细配置请参考官方文档:https://shardingsphere.apache.org/elasticjob/current/cn/overview/
ElasticJob 使用详解
ElasticJob 是一个分布式调度解决方案,提供分布式任务调度、弹性扩容缩容、作业分片等功能。下面我将详细介绍如何在 Spring Boot 中使用 ElasticJob。
环境准备
1. 添加 Maven 依赖
xml
<dependencies><!-- ElasticJob 核心依赖 --><dependency><groupId>org.apache.shardingsphere.elasticjob</groupId><artifactId>elasticjob-lite-spring-boot-starter</artifactId><version>3.0.1</version> <!-- 使用最新版本 --></dependency><!-- Zookeeper 注册中心 --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.2.0</version></dependency> </dependencies>
2. 配置文件 (application.yml)
yaml
elasticjob:reg-center:server-lists: localhost:2181 # Zookeeper 地址namespace: elasticjob-demo # 命名空间
基础使用
1. 创建简单任务
java
import org.apache.shardingsphere.elasticjob.api.ShardingContext; import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; import org.springframework.stereotype.Component;@Component public class MySimpleJob implements SimpleJob {@Overridepublic void execute(ShardingContext context) {// 获取分片信息int shardingItem = context.getShardingItem();int totalShards = context.getShardingTotalCount();System.out.printf("执行分片任务: %d/%d, 参数: %s%n", shardingItem, totalShards,context.getJobParameter());// 业务逻辑处理switch (shardingItem) {case 0: processShard0();break;case 1:processShard1();break;// ... 其他分片}}private void processShard0() {// 处理分片0的任务}private void processShard1() {// 处理分片1的任务} }
2. 配置任务
java
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap; import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter; import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class ElasticJobConfig {@Autowiredprivate ZookeeperRegistryCenter regCenter;@Autowiredprivate MySimpleJob mySimpleJob;@Bean(initMethod = "schedule")public ScheduleJobBootstrap scheduleMySimpleJob() {return new ScheduleJobBootstrap(regCenter, mySimpleJob, createJobConfiguration());}private JobConfiguration createJobConfiguration() {// 创建作业配置return JobConfiguration.newBuilder("mySimpleJob", 3) // 作业名称,分片总数.cron("0/5 * * * * ?") // 每5秒执行一次.shardingItemParameters("0=A,1=B,2=C") // 分片参数.jobParameter("globalParam") // 作业自定义参数.overwrite(true) // 本地配置是否覆盖注册中心配置.build();} }
核心功能详解
1. 分片机制
ElasticJob 的核心功能是将任务拆分成多个分片,由不同节点并行处理:
java
public void execute(ShardingContext context) {// 获取当前分片项int shardingItem = context.getShardingItem();// 获取所有待处理数据IDList<Long> allIds = fetchAllDataIds();// 根据分片规则处理数据for (Long id : allIds) {if (id % context.getShardingTotalCount() == shardingItem) {processData(id);}} }
2. 弹性扩容
当增加或减少服务实例时,ElasticJob 会自动重新分配分片:
text
初始状态(2节点): 节点1:分片0,1 节点2:分片2扩容后(3节点): 节点1:分片0 节点2:分片1 节点3:分片2
3. 任务类型
3.1 简单任务 (SimpleJob)
java
public class MySimpleJob implements SimpleJob {@Overridepublic void execute(ShardingContext context) {// 实现业务逻辑} }
3.2 数据流任务 (DataflowJob)
java
public class MyDataflowJob implements DataflowJob<Order> {@Overridepublic List<Order> fetchData(ShardingContext context) {// 获取待处理数据return fetchUnprocessedOrders(context);}@Overridepublic void processData(ShardingContext context, List<Order> data) {// 处理数据processOrders(data);} }
3.3 脚本任务 (ScriptJob)
yaml
elasticjob:jobs:myScriptJob:job-type: SCRIPTprops:script.command-line: "echo Hello ElasticJob"
4. 事件追踪
java
@Bean public ScheduleJobBootstrap scheduleJob() {return new ScheduleJobBootstrap(regCenter,myJob,JobConfiguration.newBuilder("myJob", 3).cron("0/5 * * * * ?").eventTraceRdbDataSource("dataSource") // 配置数据源.build()); }
5. 故障转移
java
JobConfiguration.newBuilder("myJob", 3).cron("0/5 * * * * ?").failover(true) // 开启故障转移.build()
高级配置
1. 作业监听器
java
public class MyJobListener implements ElasticJobListener {@Overridepublic void beforeJobExecuted(ShardingContexts contexts) {System.out.println("作业开始执行: " + contexts.getJobName());}@Overridepublic void afterJobExecuted(ShardingContexts contexts) {System.out.println("作业执行完成: " + contexts.getJobName());} }// 注册监听器 @Bean public ElasticJobListener myJobListener() {return new MyJobListener(); }// 配置到任务 JobConfiguration.newBuilder("myJob", 3).jobListenerTypes("myJobListener") // 监听器Bean名称.build()
2. 作业错误处理策略
java
JobConfiguration.newBuilder("myJob", 3).jobErrorHandlerType("LOG") // 错误处理策略.build()
支持策略:
-
LOG
:记录日志不中断作业 -
THROW
:抛出异常中断作业 -
IGNORE
:忽略异常 -
EMAIL
:发送邮件(需自定义)
3. 自定义错误处理器
java
public class MyErrorHandler implements JobErrorHandler {@Overridepublic void handleException(String jobName, Throwable cause) {// 自定义错误处理逻辑sendAlert(jobName, cause);} }// 注册为Bean @Bean public JobErrorHandler myErrorHandler() {return new MyErrorHandler(); }// 配置到任务 JobConfiguration.newBuilder("myJob", 3).jobErrorHandlerType("myErrorHandler").build()
Spring Boot 自动配置
1. 使用 @ElasticJobScheduler
注解
java
@ElasticJobScheduler(name = "myAutoJob", cron = "0/10 * * * * ?",shardingTotalCount = 3,jobParameters = "autoParam",overwrite = true ) public class MyAutoJob implements SimpleJob {@Overridepublic void execute(ShardingContext context) {System.out.println("自动配置任务执行");} }
2. 配置文件配置
yaml
elasticjob:jobs:myConfigJob:elastic-job-class: com.example.MySimpleJobcron: 0/5 * * * * ?sharding-total-count: 3sharding-item-parameters: 0=A,1=B,2=Cjob-parameters: configParamoverwrite: truejob-error-handler-type: LOG
运维管理
1. 控制台安装
-
下载 ElasticJob-Lite-UI
-
启动:
bash
tar -zxvf apache-shardingsphere-elasticjob-${version}-lite-ui-bin.tar.gz cd apache-shardingsphere-elasticjob-${version}-lite-ui-bin bin/start.sh
2. 常用 REST API
-
查看作业状态:
GET /api/jobs/{jobName}/status
-
触发作业执行:
POST /api/jobs/{jobName}/trigger
-
禁用作业:
POST /api/jobs/{jobName}/disable
-
启用作业:
POST /api/jobs/{jobName}/enable
最佳实践
1. 分片策略设计
java
public void execute(ShardingContext context) {// 1. 获取所有分片数据List<Long> allIds = getAllDataIds();// 2. 计算分片处理的数据范围List<Long> shardIds = allIds.stream().filter(id -> id % context.getShardingTotalCount() == context.getShardingItem()).collect(Collectors.toList());// 3. 处理分片数据processShardData(shardIds); }
2. 数据库分片示例
java
@Autowired private OrderRepository orderRepository;public void execute(ShardingContext context) {// 根据分片查询数据Pageable pageable = PageRequest.of(0, 100);int mod = context.getShardingTotalCount();int remainder = context.getShardingItem();List<Order> orders = orderRepository.findByStatusAndShard("PENDING",mod,remainder,pageable);// 处理订单processOrders(orders); }// Repository 接口 @Query("SELECT o FROM Order o WHERE o.status = :status " + "AND MOD(o.id, :mod) = :remainder") List<Order> findByStatusAndShard(@Param("status") String status,@Param("mod") int mod,@Param("remainder") int remainder,Pageable pageable);
3. 幂等性设计
java
public void processOrder(Order order) {// 1. 检查是否已处理if (order.isProcessed()) {return;}// 2. 加分布式锁try (Lock lock = lockService.acquireLock("order_" + order.getId())) {// 3. 再次检查(双重检查)if (orderRepository.isProcessed(order.getId())) {return;}// 4. 处理业务process(order);// 5. 更新状态order.markAsProcessed();orderRepository.save(order);} }
常见问题解决
1. Zookeeper 连接问题
yaml
# 增加连接超时配置 elasticjob:reg-center:server-lists: localhost:2181namespace: elasticjob-demobase-sleep-time-milliseconds: 1000max-sleep-time-milliseconds: 3000max-retries: 3
2. 任务不执行检查
-
确认 Zookeeper 是否正常运行
-
检查作业配置是否同步到注册中心
-
查看节点日志是否有异常
-
确认作业是否被禁用
3. 分片不均问题
使用自定义分片策略:
java
public class CustomShardingStrategy implements JobShardingStrategy {@Overridepublic Map<String, List<Integer>> sharding(List<String> servers, String jobName, int shardingTotalCount) {// 自定义分片逻辑Map<String, List<Integer>> result = new LinkedHashMap<>();for (int i = 0; i < servers.size(); i++) {result.put(servers.get(i), new ArrayList<>());}for (int i = 0; i < shardingTotalCount; i++) {int index = i % servers.size();result.get(servers.get(index)).add(i);}return result;} }// 配置使用 JobConfiguration.newBuilder("myJob", 3).jobShardingStrategyType("com.example.CustomShardingStrategy").build()
总结
ElasticJob 使用核心步骤:
-
添加依赖:引入 ElasticJob 和 Zookeeper 依赖
-
配置注册中心:在配置文件中设置 Zookeeper 地址
-
实现任务:根据需求选择 SimpleJob/DataflowJob/ScriptJob
-
配置任务:通过 Java Config 或注解配置任务参数
-
分片设计:合理设计分片策略处理分布式数据
-
运维管理:使用控制台或 API 管理任务
关键优势:
-
分布式调度:支持任务在集群中分片执行
-
弹性扩容:自动感知节点变化并重新分片
-
故障转移:自动转移失败任务分片
-
作业治理:提供丰富的运维管理功能
生产环境建议:
-
为 Zookeeper 配置集群(至少3节点)
-
重要任务配置事件追踪和监控报警
-
定期检查控制台的任务执行情况
-
设计任务时考虑幂等性和事务一致性
通过合理使用 ElasticJob,可以构建高可靠、可扩展的分布式任务调度系统,有效解决传统定时任务在分布式环境中的各种问题。