使用SSE解决获取状态不一致问题
- 1. 问题描述
- 2. SSE介绍
- 2.1 SSE 的工作原理
- 2.2 SSE 的事件格式规范
- 2.3 SSE与其他技术对比
- 2.4 SSE 的优缺点
- 3. 实战代码
1. 问题描述
目前做的一个功能是上传多个文件,这个上传文件是整体功能的一部分,文件在上传的过程中需要轮询文件上传的状态的接口,还有要调用整个任务状态的接口,因为我们的Mysql是单例的,有多个大文件上传的时候会出现任务状态不一致的问题,经过调研有WebSocket与SSE两种方式,但是我只需要把信息推给前端,所以使用个更轻量的SSE。
2. SSE介绍
SSE(Server-Sent Events) 是 HTML5 标准中的一项技术,它允许服务器通过单向连接持续地将数据推送给客户端(通常是浏览器)。
- 与传统的 HTTP 请求响应不同,SSE 是一种服务器主动推送数据的机制。
- 与 WebSocket 不同,它是单向通信:服务器 -> 客户端。
SSE 使用标准的 HTTP 协议 和 文本/event-stream 格式,通常用于实时消息推送,如股票行情、社交动态通知、在线状态更新等。
2.1 SSE 的工作原理
- 客户端通过普通的 HTTP 请求发起连接:
GET /events HTTP/1.1
Accept: text/event-stream
- 服务器响应:
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
- 事件流
event: message
id: 123
data: 这是要推送的数据
- 客户端自动处理每一条数据,并在连接断开时自动重连。
2.2 SSE 的事件格式规范
每条消息由一组字段组成,字段以冒号:
分隔,每条消息之间由两个换行符 \n\n
分隔:
示例:
id: 1001
event: update
data: {"progress": 50}
2.3 SSE与其他技术对比
2.4 SSE 的优缺点
✅ 优点:
- 使用标准 HTTP 协议,兼容性好;
- 实现简单,开发成本低;
- 支持自动重连机制;
- 支持事件类型、多行数据等;
- 适用于需要实时更新、但无需客户端发送大量消息的场景。
❌ 缺点:
- 只支持单向通信;
- 不支持二进制数据;
- 不支持所有浏览器(如 IE);
- 有并发连接数限制(部分浏览器每个域名默认最大连接数限制);
- 需配置好心跳机制、防止代理服务器中断连接。
3. 实战代码
@GetMapping("/file-upload-status")
@Operation(summary = "文件上传状态SSE")
public SseEmitter fileUploadStatus(@RequestParam("taskCode") String taskCode,@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) {
log.info("文件上传状态SSE, 请求参数: {}, Last-Event-ID: {}", taskCode, lastEventId);try {return dataConfigService.fileUploadStatus(taskCode, lastEventId);
} catch (Exception e) {log.error("文件上传状态SSE,失败,错误信息:{}", e.getMessage());throw e;}
}/*** 用于存储每个任务对应的 SseEmitter 实例。* key 为任务唯一标识 taskCode,value 为与该任务关联的 SseEmitter。* 使用 ConcurrentHashMap 是为了支持多线程环境下的并发读写,确保线程安全。* <p>* 场景:* - 客户端发起 SSE 连接时,将对应的 SseEmitter 存入此 Map。* - 后台定时任务或上传进度更新时可以通过 taskCode 拿到对应的 emitter 推送数据。* - 当连接断开(如超时、关闭、出错)时,从 Map 中移除对应的 emitter,避免内存泄漏。*/private final Map<String, SseEmitter> emitterMap = new ConcurrentHashMap<>();/*** 定时任务线程池,用于定期推送每个任务的上传状态到前端客户端(通过 SseEmitter)。* 使用 Runtime.getRuntime().availableProcessors() 获取当前机器的可用处理器核心数,* 并以此设置线程池大小,合理利用系统资源。* <p>* 特点:* - ScheduledExecutorService 支持定时或周期性任务调度,适合做周期性状态推送。* - 使用线程池而不是单线程可以支持多个任务并发状态推送。* - 可根据系统并发量和任务复杂度调整线程池大小。*/private final ScheduledExecutorService scheduledExecutorService =Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());/*** 缓存最近一次推送的状态(用于断线续传)*/private final Map<String, DataFileUploadStatusVO> lastEventDataMap = new ConcurrentHashMap<>();
@Overridepublic SseEmitter fileUploadStatus(String taskCode, String lastEventId) {// 检查任务代码是否为空if (StringUtils.isBlank(taskCode)) {throw new ServiceException(400, "请求参数不能为空");}// 创建 SseEmitter 实例,设置超时时间为 1 小时(单位:毫秒)SseEmitter emitter = new SseEmitter(60 * 60 * 1000L);// 将当前任务的 emitter 注册到全局 emitterMap 中,用于后续状态推送emitterMap.put(taskCode, emitter);// 定义清理逻辑(连接关闭时自动移除 emitter,避免内存泄漏)Runnable cleanup = () -> {emitterMap.remove(taskCode);lastEventDataMap.remove(taskCode);};// 注册回调:客户端主动关闭连接时调用emitter.onCompletion(cleanup);// 注册回调:连接超时后自动关闭并清理emitter.onTimeout(cleanup);// 注册回调:出现异常时进行日志记录并清理资源emitter.onError(e -> {log.error("文件上传状态SSE 错误,taskCode:{}, 错误信息:{}", taskCode, e.getMessage(), e);cleanup.run();});// 尝试断点续传(只在客户端提供 Last-Event-ID 的情况下)if (StringUtils.isNotBlank(lastEventId)) {DataFileUploadStatusVO lastData = lastEventDataMap.get(taskCode);if (lastData != null) {try {emitter.send(SseEmitter.event().id(lastEventId).name("upload-status").reconnectTime(5000L).data(lastData, MediaType.APPLICATION_JSON));log.info("文件上传状态SSE,断点续传成功,taskCode:{}, lastEventId:{}, 数据:{}", taskCode, lastEventId,JSON.toJSONString(lastData));} catch (IOException e) {log.error("文件上传状态SSE,断点续传失败,taskCode:{}, 错误:{}", taskCode, e.getMessage(), e);}}}// 创建一个原子容器用于持有定时任务引用,以便后续取消任务AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<>();// 启动定时任务ScheduledFuture<?> future = scheduledExecutorService.scheduleAtFixedRate(() -> {try {// 从全局映射中获取当前连接的 emitter,若为空说明连接已关闭,停止执行SseEmitter currentEmitter = emitterMap.get(taskCode);if (currentEmitter == null) {return;}// 查询文件上传状态List<DataConfigRespVO> dataConfigList = getDataConfigList(taskCode);// 查询任务信息TaskRespVO taskRespVO = taskService.getTask(taskCode);// 将数据库实体转换为状态对象列表List<DataFileUploadStatusVO.FileStatus> fileStatusList = dataConfigList.stream().map(item -> {DataFileUploadStatusVO.FileStatus fileStatus = new DataFileUploadStatusVO.FileStatus();fileStatus.setId(item.getId());fileStatus.setCode(item.getCode());fileStatus.setImportMethod(item.getImportMethod());fileStatus.setFilename(item.getFilename());fileStatus.setFileUrl(item.getFileUrl());fileStatus.setSize(item.getSize());fileStatus.setUploadStatusCode(item.getUploadStatusCode());return fileStatus;}).collect(Collectors.toList());// 构建完整状态返回对象DataFileUploadStatusVO statusVO = new DataFileUploadStatusVO();statusVO.setTaskCode(taskCode);statusVO.setCurrentTime(TimeUtil.getCurrentTime());statusVO.setFileStatusList(fileStatusList);statusVO.setName(taskRespVO.getName());statusVO.setDataType(taskRespVO.getDataType());statusVO.setDataTypeStr(taskRespVO.getDataTypeStr());statusVO.setImportType(taskRespVO.getImportType());statusVO.setCurrentLinkCode(taskRespVO.getCurrentLinkCode());statusVO.setCurrentLink(taskRespVO.getCurrentLink());statusVO.setCurrentLinkStatusCode(taskRespVO.getCurrentLinkStatusCode());statusVO.setCurrentLinkStatus(taskRespVO.getCurrentLinkStatus());String eventId = String.valueOf(System.currentTimeMillis());// 推送状态try {// 设置状态statusVO.setFinished(fileUploadFished(statusVO));// 保存最后一次推送的数据,用于断线续传lastEventDataMap.put(taskCode, statusVO);currentEmitter.send(SseEmitter.event()// 事件 ID,供断线续传.id(eventId)// 事件名.name("upload-status")// 告诉客户端:断线后5秒再重连.reconnectTime(5000L)// 推送数据为 JSON.data(statusVO, MediaType.APPLICATION_JSON));log.info("文件上传状态SSE,当前时间:{},taskCode:{},推送数据:{}", TimeUtil.getCurrentTime(), taskCode,JSON.toJSONString(statusVO));} catch (IOException ioException) {// 客户端断开连接或传输异常,主动清理资源并中止定时任务log.error("文件上传状态SSE,错误:{},taskCode:{}", ioException.getMessage(), taskCode);currentEmitter.completeWithError(ioException);cleanup.run();futureRef.get().cancel(true);return;}// 判断任务是否完成if (fileUploadFished(statusVO)) {// 设置任务完成statusVO.setFinished(true);currentEmitter.send(SseEmitter.event().name("upload-status").data(statusVO, MediaType.APPLICATION_JSON));// 主动关闭连接currentEmitter.complete();// 清理资源并取消定时任务cleanup.run();futureRef.get().cancel(true);log.info("文件上传状态SSE,任务已完成,taskCode:{},当前时间:{},emitterMap:{},大小:{}", taskCode,TimeUtil.getCurrentTime(), JSON.toJSONString(emitterMap), emitterMap.size());}} catch (Exception e) {log.error("文件上传状态SSE, SSE推送失败,taskCode:{},错误信息:{}", taskCode, e.getMessage(), e);SseEmitter failedEmitter = emitterMap.get(taskCode);if (failedEmitter != null) {failedEmitter.completeWithError(e);}cleanup.run();futureRef.get().cancel(true);}// 每2秒执行一次}, 0, 2, TimeUnit.SECONDS);// 记录定时任务引用futureRef.set(future);// 返回 emitter 给前端,保持连接return emitter;}
event:upload-status
data:{"finished":false,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:27","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新闻文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件导入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新闻文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":1}],"name":"测试文件上传123","dataType":2,"dataTypeStr":"非结构化数据","importType":"file","currentLinkCode":2,"currentLink":"数据配置","currentLinkStatusCode":1,"currentLinkStatus":"执行中"}event:upload-status
data:{"finished":false,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:29","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新闻文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件导入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新闻文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":1}],"name":"测试文件上传123","dataType":2,"dataTypeStr":"非结构化数据","importType":"file","currentLinkCode":2,"currentLink":"数据配置","currentLinkStatusCode":1,"currentLinkStatus":"执行中"}event:upload-status
data:{"finished":false,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:31","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新闻文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件导入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新闻文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":1}],"name":"测试文件上传123","dataType":2,"dataTypeStr":"非结构化数据","importType":"file","currentLinkCode":2,"currentLink":"数据配置","currentLinkStatusCode":1,"currentLinkStatus":"执行中"}event:upload-status
data:{"finished":false,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:33","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新闻文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件导入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新闻文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":1}],"name":"测试文件上传123","dataType":2,"dataTypeStr":"非结构化数据","importType":"file","currentLinkCode":2,"currentLink":"数据配置","currentLinkStatusCode":1,"currentLinkStatus":"执行中"}event:upload-status
data:{"finished":true,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:35","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新闻文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件导入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新闻文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2}],"name":"测试文件上传123","dataType":2,"dataTypeStr":"非结构化数据","importType":"file","currentLinkCode":2,"currentLink":"数据配置","currentLinkStatusCode":2,"currentLinkStatus":"已完成"}event:upload-status
data:{"finished":true,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:35","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新闻文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件导入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新闻文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2}],"name":"测试文件上传123","dataType":2,"dataTypeStr":"非结构化数据","importType":"file","currentLinkCode":2,"currentLink":"数据配置","currentLinkStatusCode":2,"currentLinkStatus":"已完成"}