引言
关于Netty和Websocket的介绍我就不多讲了,网上一搜一大片。现如今AI的趋势发展很热门,长连接对话也是会经常接触到的,使用Websocket实现长连接,那么很多人为了快速开发快速集成就会使用spring-boot-starter-websocket依赖快速实现,但是注意该实现是基于tomcat的,有性能瓶颈的,那么就又有人说了那我使用spring-webflux(底层容器就是netty),但是用的人很少,那能不能单独一个项目来处理长连接呢?
那肯定有的,基于netty自己实现
怎么使用?
其实怎么说呢,netty实现的websocket网上也是一大把,但是终究是个demo,网上也是很多人问:怎么实现动态多路由,像mvc一样多个路由呢?用过spring-boot-starter-websocket都知道,搭配onOpen、onMesssage、onClose注解轻松使用,使用@ServerEndpoint实现多路由,那么netty怎么实现呢(netty本身是不支持的,都是需要自己去实现)?
我们要明白netty的定位,高性能、异步事件驱动的网络应用框架,主要用于快速开发可维护的高性能协议服务器和客户端,提供底层网络 I/O 的抽象,全是抽象,需要自己去自定义实现。
正题开始
废话就不多说了,直接上代码,如果文章对你有帮助,请3连!!!
maven依赖,只用netty和spring,不需要web容器:
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.65.Final</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency>
1、核心接口WebSocketHandler
//Websocket处理器接口,入口WebSocketGlobalIntercept将会封装连接处理逻辑和事件通知逻辑,专注实现业务
public interface WebSocketHandler {/*** 当首次握手连接成功后(升级为websocket时)将会触发,可用于连接合法性处理** @param session 会话对象*/void onOpen(WebSocketSession session);/*** 当触发websocket消息帧时,将会通知该方法** @param session 会话对象* @param message 消息对象:文本、二进制数据等*/void onMessage(WebSocketSession session, WebSocketFrame message);/*** 当连接关闭时将通知该方法,需要释放资源并且清理session** @param session 会话对象*/void onClose(WebSocketSession session);/*** 当连接过程中、通信过程中出现异常将通知该方法** @param session 会话对象* @param error 异常信息*/void onError(WebSocketSession session, Throwable error);
}
2、会话Session类
public class WebSocketSession {/*** netty channelContext 对象,注意此对象不可序列化*/private ChannelHandlerContext channelContext;/*** 请求路由路径*/private String path;/*** 扩展参数map,如需自定义携带参数时即可用于存入*/private Map<String, Object> attributes = new ConcurrentHashMap<>();/*** 只提供一个有参构造方法,channelContext和 path不能为空** @param channelContext channel上下文* @param path 请求路径* @param attributes 扩展参数map*/public WebSocketSession(ChannelHandlerContext channelContext, String path, Map<String, Object> attributes) {this.channelContext = channelContext;this.path = path;this.attributes = attributes;}/*** 提供一个静态方法获取对象** @param channelContext channel上下文* @param path 请求路径* @param attributes 扩展参数map* @return*/public static WebSocketSession of(ChannelHandlerContext channelContext, String path, Map<String, Object> attributes) {return new WebSocketSession(channelContext, path, attributes);}/*** 发送TextWebSocketFrame消息** @param text 消息文本*/public void sendText(String text) {this.channelContext.writeAndFlush(new TextWebSocketFrame(text));}/*** 发送BinaryWebSocketFrame 二进制消息** @param data*/public void sendBinary(ByteBuf data) {this.channelContext.writeAndFlush(new BinaryWebSocketFrame(data));}/*** 处理心跳检测ping消息,响应pong** @param frame pong消息帧*/public void sendPong(PongWebSocketFrame frame) {this.channelContext.writeAndFlush(frame);}/*** 强制关闭连接*/public void close() {this.channelContext.close();}/*** 优雅关闭连接,其实就是发送了关闭协议帧** @param frame 关闭帧*/public void close(CloseWebSocketFrame frame) {this.channelContext.writeAndFlush(frame.retain()).addListener(ChannelFutureListener.CLOSE);}/*** 优雅关闭连接,其实就是发送了关闭协议帧** @param reason 关闭原因*/public void close(String reason) {CloseWebSocketFrame frame = new CloseWebSocketFrame(WebSocketCloseStatus.SERVICE_RESTART,reason);close(frame);}/*** set自定义扩展值** @param name 名称* @param value 值*/public void setAttribute(String name, Object value) {this.attributes.put(name