WebApplicationType.REACTIVE 的webSocket

通用请求体类

@Data
@ApiModel("websocket请求消息")
public class WebSocketRequest<T> implements Serializable {private static final long serialVersionUID = 1L;/*** 参考:com.mcmcnet.gacne.basic.service.common.pojo.enumeration.screen.AiBroadcastEventEnum;*/private @NotNull(message = "业务操作类型不能为空") Integer aiBroadcastEventEnum;private T data;public T getRealData(Class<T> clazz) {if (this.data == null) {return null;} else {String jsonStr = JsonUtil.toJsonStr(this.data);return (T) JsonUtil.parseObject(jsonStr, clazz);}}}

通用响应类

@ApiModel("websocket响应消息")
@Data
public class WebSocketResponse<T> implements Serializable {private static final long serialVersionUID = 1L;/*** 参考枚举:com.mcmcnet.gacne.basic.service.common.pojo.enumeration.screen.AiBroadcastEventEnum*/private Integer aiBroadcastEventEnum;private String code;private String msg;private T data;public static <T> Mono<WebSocketResponse<T>> ok(Integer eventEnum, T data) {WebSocketResponse<T> response = new WebSocketResponse<T>();response.setAiBroadcastEventEnum(eventEnum);response.setCode(RespStatusEnum.OK.getCode());response.setMsg(RespStatusEnum.OK.getMsg());response.setData(data);return Mono.just(response);}public static Mono<WebSocketResponse<Void>> ok(Integer eventEnum) {WebSocketResponse<Void> response = new WebSocketResponse<Void>();response.setAiBroadcastEventEnum(eventEnum);response.setCode(RespStatusEnum.OK.getCode());response.setMsg(RespStatusEnum.OK.getMsg());return Mono.just(response);}public static <T> Mono<WebSocketResponse<T>> fail(Integer eventEnum, RespStatusEnum status, String err) {WebSocketResponse<T> response = new WebSocketResponse<T>();response.setAiBroadcastEventEnum(eventEnum);response.setCode(status.getCode());response.setMsg(err);return Mono.just(response);}}

连接类型类

@Data
@Accessors(chain = true)
public class ConnectDTO {/*** 连接类型 参考枚举:com.mcmcnet.gacne.basic.service.common.pojo.enumeration.screen.AiBroadcastEventEnum*/private Integer type;}
  1. 配置类
@Configuration
public class WebFluxWebSocketConfig {/** 让 Spring 注入已经带依赖的 Handler */@Beanpublic HandlerMapping webSocketMapping(WebSocketReceivedHandler handler) {return new SimpleUrlHandlerMapping(Map.of("/api/xxx/ws", handler),   // 用注入的 handler-1);}@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter();}
}
  1. 实现类
@Component
@RequiredArgsConstructor
@Slf4j
public class WebSocketReceivedHandler implements WebSocketHandler {@Autowiredprivate AiBroadcastEventHandlerDispatcher<?, ?> dispatcher;@Autowiredprivate WsSessionPool wsSessionPool;@Overridepublic @NotNull Mono<Void> handle(@NotNull WebSocketSession session) {wsSessionPool.add(session);String sid = session.getId();// 处理客户端请求消息,生成响应消息流Flux<WebSocketMessage> inputFlux = session.receive().map(WebSocketMessage::getPayloadAsText).flatMap(payload -> dispatcher.doDispatch(session, payload).map(session::textMessage));// 服务端广播消息流Flux<WebSocketMessage> broadcastFlux = wsSessionPool.getPersonalFlux(sid).map(session::textMessage);// 合并两个流,确保 session.send 只调用一次Flux<WebSocketMessage> mergedFlux = Flux.merge(inputFlux, broadcastFlux);return session.send(mergedFlux).doFinally(sig -> {wsSessionPool.remove(session);log.info("websocket 关闭,sessionId:{},signal:{}", session.getId(), sig);});}
}

3.处理类 aiBroadcastEventEnum 是枚举类型,根据不同的枚举类型进入不同的处理类进行处理不同的消息返回


@Component
@Slf4j
public class AiBroadcastEventHandlerDispatcher<T, R> {private final Map<Integer, AiBroadcastEventHandler<T, R>> eventMap = new HashMap<>();/** 由 Spring 注入所有事件处理器 */public AiBroadcastEventHandlerDispatcher(List<AiBroadcastEventHandler<T, R>> handlers) {handlers.forEach(h -> eventMap.put(h.aiBroadcastEvent(), h));}/*** 处理前端发来的 Payload 并把响应写回当前 session** @param session 当前 WebFlux Session* @param payload 文本 JSON* @return Mono<Void> 供调用方链式订阅*/public Mono<String> doDispatch(WebSocketSession session, String payload) {WebSocketRequest<R> webSocketRequest = JsonUtil.parseObject(payload, new TypeReference<WebSocketRequest<R>>() {});log.info("webSocketRequest:{}, sessionID:{}", webSocketRequest, session.getId());// 发送响应并记录日志return handlerRequest(webSocketRequest, session).map(JsonUtil::toJson).doOnNext(json -> log.info("准备发送: {}", json)).onErrorResume(e -> {log.error("发送异常", e);return Mono.just(JsonUtil.toJson(WebSocketResponse.fail(webSocketRequest != null ? webSocketRequest.getAiBroadcastEventEnum() : null,RespStatusEnum.INTERNAL_SERVICE_ERROR,"系统异常:" + e.getMessage())));});}/** 实际路由到具体 Handler */private Mono<WebSocketResponse<T>> handlerRequest(WebSocketRequest<R> req, WebSocketSession session) {if (ObjectUtil.isNull(req) || !eventMap.containsKey(req.getAiBroadcastEventEnum())) {return WebSocketResponse.fail(req.getAiBroadcastEventEnum(),RespStatusEnum.INTERNAL_SERVICE_ERROR,"aiBroadcastEventEnum not find");}try {return eventMap.get(req.getAiBroadcastEventEnum()).handler(req, session);} catch (Exception e) {log.error("websocket 处理消息异常,webSocketRequest:{}, sessionID:{}",req, session.getId(), e);return WebSocketResponse.fail(req.getAiBroadcastEventEnum(),RespStatusEnum.INTERNAL_SERVICE_ERROR, e.getMessage());}}}
  1. 接口
public interface AiBroadcastEventHandler<T, R> {/*** 对应事件枚举值(例如 MAP_ALARM_CHARGING 的 code)*/Integer aiBroadcastEvent();/*** 执行处理逻辑,并返回响应,最终由 dispatcher 推送给前端** @param webSocketRequest 请求体* @param session          当前连接* @return Mono<WebSocketResponse<T>> 最终会转成 JSON 发给前端*/Mono<WebSocketResponse<T>> handler(WebSocketRequest<R> webSocketRequest, WebSocketSession session);/*** 校验参数*/void validateParam(WebSocketRequest<R> webSocketRequest) throws ParameterException;
  1. 通用处理逻辑
public abstract class AbstractAiBroadcastEventHandler<T, R>implements AiBroadcastEventHandler<T, R> {/*** websocket 请求事件处理统一流程:参数校验 → 业务处理*/@Overridepublic Mono<WebSocketResponse<T>> handler(WebSocketRequest<R> webSocketRequest,WebSocketSession session) {try {validateParam(webSocketRequest);return doHandler(webSocketRequest, session);} catch (ParameterException e) {return WebSocketResponse.fail(webSocketRequest.getAiBroadcastEventEnum(),RespStatusEnum.INTERNAL_SERVICE_ERROR,e.getMessage());}}/*** 子类实现真正的业务逻辑:*   1. 更新本地 sessionId / Redis 映射*   2. 查询并返回最新业务数据*/public abstract Mono<WebSocketResponse<T>> doHandler(WebSocketRequest<R> webSocketRequest, WebSocketSession session);
  1. 实现类
@Component
@Slf4j
public class SubscribeFireContentHandler extends AbstractAiBroadcastEventHandler<VO, ConnectDTO> implements AiBroadcastEventHandler<VO, ConnectDTO>{@Autowiredprivate BizServiceSafeScreenClient bizServiceSafeScreenClient;@Overridepublic Integer aiBroadcastEvent() {return AiBroadcastEventEnum.AI_FIRE_ALARM_CONTENT.getCode();}@Overridepublic Mono<WebSocketResponse<VO>> doHandler(WebSocketRequest<ConnectDTO> webSocketRequest, WebSocketSession session) {log.info("SubscribeFireContentHandler doHandler start");//session订阅该订单数据ConnectDTO dto = webSocketRequest.getRealData(ConnectDTO.class);if (!AiBroadcastEventEnum.AI_FIRE_ALARM_CONTENT.getCode().equals(dto.getType())) {return Mono.error(new RespException("参数异常", RespStatusEnum.CLIENT_ERROR));}// 查询火灾站 前端拼接内容FinalResultVO<VO> xxx = 调用接口获取数据;if (xxx  != null) {Mono<WebSocketResponse<FireStationVO>> ok = WebSocketResponse.ok(AiBroadcastEventEnum.AI_FIRE_ALARM_CONTENT.getCode(), xxx);return ok;}return Mono.empty();}@Overridepublic void validateParam(WebSocketRequest<ConnectDTO> webSocketRequest) {ConnectDTO dto = webSocketRequest.getRealData(ConnectDTO.class);if (ObjUtil.isNull(dto.getType())) {throw new RespException("参数异常", RespStatusEnum.CLIENT_ERROR);}}

7.心跳

@Component
@Log4j2
public class WsHeartbeatTask {private final WsSessionPool wsSessionPool;public WsHeartbeatTask(WsSessionPool wsSessionPool) {this.wsSessionPool = wsSessionPool;}@PostConstructpublic void init() {log.info("WebSocket心跳任务已启动");}// 每30秒广播一个心跳消息@Scheduled(fixedRate = 30_000)public void sendHeartbeat() {String timeStr = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));String json = String.format("{\"type\":\"ping\",\"timestamp\":\"%s\"}", timeStr);wsSessionPool.broadcast(json);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/89125.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/89125.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

降本增效!自动化UI测试平台TestComplete并行测试亮点

在跨平台自动化测试中&#xff0c;企业常面临设备投入高、串行测试耗时长、测试覆盖率难以兼顾的困境。自动化UI测试平台TestComplete的并行测试引擎提供了有效的解决方案&#xff1a;通过云端海量设备池与CI/CD深度集成&#xff0c;实现多平台、多浏览器并行测试&#xff0c;显…

云、实时、时序数据库混合应用:医疗数据管理的革新与展望(上)

云、实时、时序数据库混合应用:医疗数据管理的革新与展望 1、引言 1.1 研究背景与意义 在信息技术飞速发展的当下,医疗行业正经历着深刻的数字化转型。这一转型不仅是技术层面的革新,更是关乎医疗体系未来发展方向的深刻变革。从医疗服务的提供方式,到医疗管理的模式,再…

代码随想录算法训练营十六天|二叉树part06

LeetCode 530 二叉搜索树的最小绝对差 题目链接&#xff1a;530. 二叉搜索树的最小绝对差 - 力扣&#xff08;LeetCode&#xff09; 给你一个二叉搜索树的根节点 root &#xff0c;返回 树中任意两不同节点值之间的最小差值 。 差值是一个正数&#xff0c;其数值等于两值之差…

自增主键为什么不是连续的?

前言 如果一个线程回滚&#xff0c;例如唯一键冲突的情况回滚时&#xff0c;回滚了sql语句&#xff0c;但是并没有把自增的值也-1。那么就会导致下一条插入的数据自增id出现了跳跃。 自增主键为什么不是连续的&#xff1f;前言执行时机为什么自增主键不是连续的为什么不回滚自…

OpenCV图像基本操作:读取、显示与保存

在图像处理项目中&#xff0c;图像的 读取&#xff08;imread&#xff09;、显示&#xff08;imshow&#xff09; 和 保存&#xff08;imwrite&#xff09; 是最基础也是最常用的三个操作。本文将详细介绍这三个函数的功能、用法和注意事项&#xff0c;并提供一个完整示例供读者…

.NET控制台应用程序中防止程序立即退出

在VB.NET控制台应用程序中防止程序立即退出&#xff0c;主要有以下几种常用方法&#xff0c;根据需求选择适合的方案&#xff1a; 方法1&#xff1a;等待用户输入&#xff08;推荐&#xff09; Module Module1Sub Main()Console.WriteLine("程序开始运行...") 这里是…

Vue3 + Three.js 极速入门:打造你的第一个3D可视化项目

文章目录前言一、环境准备1.1 创建Vue3项目1.2 安装Three.js二、Three.js核心概念速览三、实战&#xff1a;创建旋转立方体3.1 组件化初始化四、核心代码解析4.1 Vue3响应式整合技巧4.2 性能优化要点五、进阶功能扩展5.1 数据驱动控制5.2 加载3D模型六、常见问题解决七、资源推…

【设计模式】享元模式(轻量级模式) 单纯享元模式和复合享元模式

享元模式&#xff08;Flyweight Pattern&#xff09;详解一、享元模式简介 享元模式&#xff08;Flyweight Pattern&#xff09; 是一种 结构型设计模式&#xff08;对象结构型模式&#xff09;&#xff0c;它通过共享技术实现相同或相似对象的重用&#xff0c;以减少内存占用和…

驱动开发_2.字符设备驱动

目录1. 什么是字符设备2. 设备号2.1 设备号概念2.2 通过设备号dev分别获取主、次设备号的宏函数2.3 主设备号的申请静态申请动态分配2.4 注销设备号3. 字符设备3.1 注册字符设备3.2 注销字符设备3.3 应用程序和驱动程序的关系3.4 file_opertaions结构体3.5 class_create3.6 创建…

直播推流技术底层逻辑详解与私有化实现方案-以rmtp rtc hls为例-优雅草卓伊凡

直播推流技术底层逻辑详解与私有化实现方案-以rmtp rtc hls为例-优雅草卓伊凡由于我们的甲方客户要开始为我们项目产品上加入私有化的直播&#xff0c;这块不得不又捡起来曾经我们做直播推流的事情了&#xff0c;其实私有化直播一直并不是一件容易的事情&#xff0c;现在大部分…

一文读懂现代卷积神经网络—深度卷积神经网络(AlexNet)

目录 深度卷积神经网络&#xff08;AlexNet&#xff09;是什么&#xff1f; 一、AlexNet 的核心创新 1. 深度架构 2. ReLU 激活函数 3. 数据增强 4. Dropout 正则化 5. GPU 并行计算 6. 局部响应归一化&#xff08;LRN&#xff09; 二、AlexNet 的网络结构 三、AlexN…

JVM 垃圾收集算法全面解析

1. 引言1.1 为什么需要垃圾收集&#xff1f;在Java应用中&#xff0c;垃圾收集&#xff08;Garbage Collection&#xff0c;GC&#xff09;是一个至关重要的机制&#xff0c;它使得开发者不需要手动管理内存。与传统的语言&#xff08;如C或C&#xff09;不同&#xff0c;Java通…

Vmware中安装的CentOS7如何扩展硬盘大小

起初创建虚拟机时&#xff0c;大小设置不合理&#xff0c;导致我在尝试开源项目时空间不足重新扩展硬盘&#xff0c;不仅需要在虚拟机设置中配置&#xff0c;还需要在系统内重新进行分区一、虚拟机设置打开虚拟机设置→硬盘→扩展&#xff0c;将大小设置为自己期望的大小&#…

Python+MongoDB高效开发组合

如大家所知&#xff0c;Python与MongoDB的结合是一种高效的开发组合&#xff0c;主要用于通过Python进行数据存储、查询及管理&#xff0c;利用MongoDB的文档型数据库特性实现灵活的数据处理。下面让 Python 连接上 MongoDB&#xff1a;安装 PyMongo&#xff1a;pip3 install p…

【论文阅读】Masked Autoencoders Are Effective Tokenizers for Diffusion Models

introduce什么样的 latent 空间更适合用于扩散模型&#xff1f;作者发现&#xff1a;相比传统的 VAE&#xff0c;结构良好、判别性强的 latent 空间才是 diffusion 成功的关键。研究动机&#xff1a;什么才是“好的 latent 表征”&#xff1f;背景&#xff1a;Diffusion Models…

每日一SQL 【游戏玩法分析 IV】

文章目录问题案例执行顺序使用分组解决问题 案例 执行顺序 SQL 语句的执行顺序&#xff08;核心步骤&#xff09; 同一层级的select查询内部, 别名在整个 SELECT 计算完成前不生效 使用分组解决 select distinct s.product_id, Product.product_name from Sales sleft join …

内部文件审计:企业文件服务器审计对网络安全提升有哪些帮助?

企业文件服务器审计工作不仅对提升企业网络信息安全起到重要作用&#xff0c;还能对企业内部网络文件信息是否合规进行判断。因此企业文件服务器审计一直被高度重视。 一、文件服务器为何成为攻击焦点&#xff1f; 企业文件服务器通常集中存储财务报表、人事档案、研发资料、客…

FusionOne HCI 23 超融合实施手册(超聚变超融合)

产品介绍 FusionOne HCI作为实现企业信息一体化的IT基础设施平台&#xff0c;以“软硬件垂直深度集成和调优”、“快速部署”、“统一管理”的理念&#xff0c;提供应用融合部署&#xff0c;提升核心业务运作效率&#xff0c;降低整体采购成本。 FusionOne HCI代表了IT产品的…

AI算姻缘测算小工具流量主微信小程序开源

功能特点 响应式设计&#xff1a;完美适配各种移动设备屏幕尺寸 精美UI界面&#xff1a; 柔和的粉红色渐变背景 圆角卡片设计 精心设计的字体和间距 爱心图标点缀 动态效果&#xff1a; 点击按钮时的动画反馈 测算结果的平滑过渡动画 爱心漂浮动画 进度条动态填充 AI测算功能&a…

Vue获取上传Excel文件内容并展示在表格中

一、安装依赖 npm install xlsx 二、引用依赖 import XLSX from xlsx 三、代码实现 1、注意&#xff1a;函数 analysis 中reader.readAsBinaryString(file)&#xff0c;file的数据格式如图所示 2、示例代码 <!-- 项目使用的前端框架为非流行框架&#xff0c;主要关注…