系统设计
架构图
+----------------+ +-----------------+ +----------------+ | | | | | | | 生产者 |------>| Redis ZSet |------>| 定时任务消费者 | | (添加延迟任务) | | (延迟队列存储) | | (扫描并处理任务)| | | | | | | +----------------+ +-----------------+ +----------------+↑ || ↓| +---------------------++-----------------------------------| 任务处理器 || (执行具体业务逻辑) |+---------------------+
核心流程
生产者将任务添加到Redis ZSet中,score为任务执行时间戳
定时任务定期扫描ZSet,找出score小于当前时间的任务
消费者线程池处理到期的任务
任务处理完成后从ZSet中移除
实现步骤
步骤一:添加依赖(pom.xml)
<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Redis集成 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- 定时任务 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId></dependency><!-- JSON处理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- Lombok简化代码 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
</dependencies>
步骤二:配置Redis(application.yml)
spring:redis:host: localhostport: 6379password: database: 0lettuce:pool:max-active: 20max-idle: 10min-idle: 2max-wait: 10000ms# 自定义延迟队列配置
delay:queue:key: "delay_queue" # Redis ZSet键名batch-size: 10 # 每次处理任务数量interval: 5000 # 定时任务执行间隔(ms)thread-pool-size: 5 # 消费者线程池大小
步骤三:创建任务模型(DelayTask.java)
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class DelayTask {/*** 任务类型枚举*/public enum TaskType {ORDER_TIMEOUT, // 订单超时处理EMAIL_REMINDER, // 邮件提醒TASK_EXECUTION // 定时任务执行}private TaskType type; // 任务类型private String taskId; // 任务唯一IDprivate String content; // 任务内容private long createTime; // 任务创建时间private long executeTime; // 任务执行时间// 重写toString方法用于序列化@Overridepublic String toString() {return "DelayTask{" +"type=" + type +", taskId='" + taskId + '\'' +", content='" + content + '\'' +", createTime=" + createTime +", executeTime=" + executeTime +'}';}
}
步骤四:创建Redis配置类(RedisConfig.java)
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);// 使用Jackson序列化GenericJackson2JsonRedisSerializer jacksonSerializer = new GenericJackson2JsonRedisSerializer();// Key序列化template.setKeySerializer(new StringRedisSerializer());// Value序列化template.setValueSerializer(jacksonSerializer);// Hash Key序列化template.setHashKeySerializer(new StringRedisSerializer());// Hash Value序列化template.setHashValueSerializer(jacksonSerializer);template.afterPropertiesSet();return template;}
}
步骤五:创建线程池配置类(ThreadPoolConfig.java)
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
public class ThreadPoolConfig {@Value("${delay.queue.thread-pool-size:5}")private int threadPoolSize;@Bean("taskExecutor")public Executor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 核心线程数executor.setCorePoolSize(threadPoolSize);// 最大线程数executor.setMaxPoolSize(threadPoolSize * 2);// 队列大小executor.setQueueCapacity(100);// 线程名前缀executor.setThreadNamePrefix("delay-task-");// 拒绝策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 初始化executor.initialize();return executor;}
}
步骤六:创建延迟队列服务(DelayQueueService.java)
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Set;
import java.util.concurrent.Executor;@Slf4j
@Service
public class DelayQueueService {@Value("${delay.queue.key}")private String delayQueueKey;@Value("${delay.queue.batch-size}")private int batchSize;@Resourceprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate Executor taskExecutor;/*** 添加延迟任务* @param task 任务对象* @param delaySeconds 延迟秒数*/public void addTask(DelayTask task, long delaySeconds) {long executeTime = System.currentTimeMillis() + (delaySeconds * 1000);task.setExecuteTime(executeTime);// 添加到Redis ZSetredisTemplate.opsForZSet().add(delayQueueKey, task, executeTime);log.info("添加延迟任务成功, 任务ID: {}, 执行时间: {}", task.getTaskId(), executeTime);}/*** 定时扫描任务(每5秒执行一次)*/@Scheduled(fixedRateString = "${delay.queue.interval}")public void scanExpiredTasks() {long now = System.currentTimeMillis();log.debug("开始扫描延迟队列, 当前时间: {}", now);// 获取当前时间之前的所有任务Set<ZSetOperations.TypedTuple<Object>> tasks = redisTemplate.opsForZSet().rangeByScoreWithScores(delayQueueKey, 0, now, 0, batchSize);if (tasks == null || tasks.isEmpty()) {log.debug("未找到待处理任务");return;}log.info("发现 {} 个待处理任务", tasks.size());for (ZSetOperations.TypedTuple<Object> tuple : tasks) {Object taskObj = tuple.getValue();if (taskObj instanceof DelayTask) {DelayTask task = (DelayTask) taskObj;// 使用线程池异步处理任务taskExecutor.execute(() -> processTask(task));}}}/*** 处理任务* @param task 延迟任务*/@Asyncpublic void processTask(DelayTask task) {try {log.info("开始处理任务: {}", task.getTaskId());// 根据任务类型执行不同逻辑switch (task.getType()) {case ORDER_TIMEOUT:handleOrderTimeout(task);break;case EMAIL_REMINDER:sendReminderEmail(task);break;case TASK_EXECUTION:executeScheduledTask(task);break;default:log.warn("未知任务类型: {}", task.getType());}// 处理完成后从队列中移除redisTemplate.opsForZSet().remove(delayQueueKey, task);log.info("任务处理完成并移除: {}", task.getTaskId());} catch (Exception e) {log.error("任务处理失败: {}", task.getTaskId(), e);handleProcessingError(task);}}// 示例:订单超时处理private void handleOrderTimeout(DelayTask task) {log.info("处理订单超时任务: {}", task.getContent());// 实际业务逻辑:取消订单、释放库存等// 模拟处理时间try {Thread.sleep(1000);} catch (InterruptedException ignored) {}}// 示例:发送提醒邮件private void sendReminderEmail(DelayTask task) {log.info("发送提醒邮件: {}", task.getContent());// 实际业务逻辑:调用邮件服务发送邮件}// 示例:执行定时任务private void executeScheduledTask(DelayTask task) {log.info("执行定时任务: {}", task.getContent());// 实际业务逻辑:执行定时任务}// 错误处理private void handleProcessingError(DelayTask task) {log.error("任务处理失败,加入死信队列: {}", task.getTaskId());// 可以将失败任务移到死信队列redisTemplate.opsForList().rightPush("delay:dead-letter", task);}
}
步骤七:创建测试Controller(DelayQueueController.java)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/delay")
public class DelayQueueController {@Autowiredprivate DelayQueueService delayQueueService;/*** 添加延迟任务* @param type 任务类型 (1-订单超时, 2-邮件提醒, 3-定时任务)* @param seconds 延迟秒数* @param content 任务内容*/@PostMapping("/add")public String addDelayTask(@RequestParam("type") int type,@RequestParam("seconds") long seconds,@RequestParam("content") String content) {// 创建任务IDString taskId = "TASK-" + System.currentTimeMillis();// 转换任务类型DelayTask.TaskType taskType;switch (type) {case 1: taskType = DelayTask.TaskType.ORDER_TIMEOUT; break;case 2: taskType = DelayTask.TaskType.EMAIL_REMINDER; break;case 3: taskType = DelayTask.TaskType.TASK_EXECUTION; break;default: throw new IllegalArgumentException("无效的任务类型");}// 创建任务DelayTask task = new DelayTask(taskType, taskId, content, System.currentTimeMillis(), 0);// 添加任务delayQueueService.addTask(task, seconds);return "任务添加成功! ID: " + taskId;}/*** 查看队列状态*/@GetMapping("/status")public String queueStatus() {long size = delayQueueService.getQueueSize();return "当前延迟队列任务数量: " + size;}
}
启动类(DelayQueueApplication.java)
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication
@EnableScheduling // 启用定时任务
@EnableAsync // 启用异步方法
public class DelayQueueApplication {public static void main(String[] args) {SpringApplication.run(DelayQueueApplication.class, args);}
}
方案优势与注意事项
优势
高性能:利用Redis内存操作和ZSet有序特性
低延迟:定时任务扫描保证任务及时处理
高可靠:任务处理失败可进入死信队列
可扩展:线程池支持并行处理多个任务
灵活配置:支持批量处理大小、扫描间隔等参数配置
注意事项
任务幂等性:确保任务可重复处理而不产生副作用
任务超时处理:长时间任务需考虑超时机制
Redis持久化:根据业务需求配置RDB或AOF
分布式环境:多实例部署时需考虑任务竞争问题
监控告警:添加队列积压监控和任务失败告警
扩展建议
添加管理界面:
查看队列中的任务
手动重试失败任务
统计任务处理成功率
分布式锁优化:
// 在scanExpiredTasks方法中 String lockKey = "delay_queue_lock"; Boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(lockKey, "locked", 30, TimeUnit.SECONDS);if (lockAcquired != null && lockAcquired) {try {// 执行扫描任务逻辑} finally {redisTemplate.delete(lockKey);} }
任务优先级支持:
// 在添加任务时,可将优先级加入score计算 double score = executeTime + (priority * 0.001);
延迟时间精确控制:
使用Redisson的DelayedQueue组件
或使用Redis的Keyspace通知功能
这个实现方案提供了一个完整、可扩展的延迟队列系统,适用于订单超时处理、定时提醒、延迟任务执行等多种业务场景。