基于Netty的TCP Server端和Client端解决正向隔离网闸数据透传问题

背景

因为安装了正向隔离网闸,导致数据传输的时候仅支持TCP协议和UDP协议,因此需要开发TCP Client和Server服务来将数据透传,当前环境是获取的数据并将数据转发到kafka

 1.引入依赖

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.84.Final</version>
</dependency>

2.编写TCP Server端 

TCP Server代码

本代码已经解决TCP的粘包和半包问题,需要通过固定的$符号进行数据分割,使得数据不会错出现粘包和半包问题,可以根据数据大小制定一个不会超过发送消息长度的值

 

package com.huanyu.forward.tcp.server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;@Slf4j
@Service("tcpServer")
@ConditionalOnExpression("#{'${spring.tcp-server.port:}'.length()>0}")
public class TcpNettyServer {@Value("${spring.tcp-server.port:22222}")private Integer port;public static void main(String[] args) throws Exception {new TcpNettyServer().server(22222);}@PostConstruct()public void initTcpServer() {try {log.info("start tcp server......");server(port);} catch (Exception e) {log.error("tcp server start failed");}}public void server(int port) throws Exception {//bossGroup就是parentGroup,是负责处理TCP/IP连接的EventLoopGroup bossGroup = new NioEventLoopGroup();//workerGroup就是childGroup,是负责处理Channel(通道)的I/O事件EventLoopGroup workerGroup = new NioEventLoopGroup();ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1, 1);buffer.writeByte('$');ServerBootstrap sb = new ServerBootstrap();//初始化服务端可连接队列,指定了队列的大小500sb.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)//保持长连接.childOption(ChannelOption.SO_KEEPALIVE, true)// 绑定客户端连接时候触发操作.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sh) throws Exception {//handler是按顺序执行的ChannelPipeline pipeline = sh.pipeline();//业务编码 -解决 数据粘包和半包问题-pipeline.addLast(new DelimiterBasedFrameDecoder(1024 * 1024 * 10, buffer));
//                        pipeline.addLast(new LoggingHandler(LogLevel.WARN));pipeline.addLast(new TcpBizFlagHandler());//业务编码//使用DataHandler类来处理接收到的消息pipeline.addLast(new TcpDataHandler());}});//绑定监听端口,调用sync同步阻塞方法等待绑定操作完ChannelFuture future = sb.bind(port).sync();if (future.isSuccess()) {log.info("tcp server is listening on  :{}", port);} else {log.error("tcp server is failed ", future.cause());//关闭线程组bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}//成功绑定到端口之后,给channel增加一个 管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程。
//        future.channel().closeFuture().await();}
}

 数据标志位接收代码

package com.huanyu.forward.tcp.server;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;import java.nio.charset.StandardCharsets;
import java.util.List;@Slf4j
public class TcpBizFlagHandler extends ByteToMessageDecoder {public static final String BIZ_FLAG = "bizFlag";private static final String FLAG_PRE = "@@{";private static final String FLAG_SUF = "}##";private static final byte[] FLAG_PREFIX = FLAG_PRE.getBytes(StandardCharsets.UTF_8);private static final byte[] FLAG_SUFFIX = FLAG_SUF.getBytes(StandardCharsets.UTF_8);@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (in.readableBytes() < FLAG_PREFIX.length + FLAG_SUFFIX.length) {log.warn("数据长度不够");text(in);return;}int prefixIndex = in.readerIndex();if (!startsWith(in)) {text(in);// 忽略非标志位开头的数据in.skipBytes(in.readableBytes());log.warn("数据不包含指定的前缀");return;}int suffixIndex = indexOf(in);if (suffixIndex == -1) {log.warn("数据不包含指定的某字符");text(in);return;}int flagLength = suffixIndex - prefixIndex + FLAG_SUFFIX.length;byte[] flagBytes = new byte[flagLength];in.readBytes(flagBytes); // 读取标志位// 保留标志位的对象结构-以@@{开头以}##结尾,形如@@{"k":"v"}##{"k":"v"}$,@@和##之间的数据为补充的对象参数JSON,$为换行符号String flag = new String(flagBytes, FLAG_PRE.length() - 1, flagBytes.length - FLAG_PREFIX.length - FLAG_SUFFIX.length + 2, StandardCharsets.UTF_8);// 保存标志位到 Channel 属性中供后续使用ctx.channel().attr(AttributeKey.valueOf(BIZ_FLAG)).set(flag);// 剩余数据继续传递给下一个 Handler 处理(透传)out.add(in.readRetainedSlice(in.readableBytes()));}private static void text(ByteBuf in) {byte[] msgByte = new byte[in.readableBytes()];in.readBytes(msgByte);log.warn("数据:{}", new String(msgByte, StandardCharsets.UTF_8));}private boolean startsWith(ByteBuf buf) {for (int i = 0; i < TcpBizFlagHandler.FLAG_PREFIX.length; i++) {if (buf.getByte(buf.readerIndex() + i) != TcpBizFlagHandler.FLAG_PREFIX[i]) {return false;}}return true;}private int indexOf(ByteBuf buf) {int readerIndex = buf.readerIndex();int readableBytes = buf.readableBytes();for (int i = 0; i <= readableBytes - TcpBizFlagHandler.FLAG_SUFFIX.length; i++) {boolean match = true;for (int j = 0; j < TcpBizFlagHandler.FLAG_SUFFIX.length; j++) {if (buf.getByte(readerIndex + i + j) != TcpBizFlagHandler.FLAG_SUFFIX[j]) {match = false;break;}}if (match) {return readerIndex + i;}}return -1;}
}

业务转发/解析代码 

package com.huanyu.forward.tcp.server;import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;import static com.aimsphm.forward.tcp.server.TcpBizFlagHandler.BIZ_FLAG;@Slf4j
@Service
public class TcpDataHandler extends ChannelInboundHandlerAdapter {//    @Resourceprivate KafkaTemplate<String, Object> template;//接受client发送的消息@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {Channel channel = ctx.channel();// 获取标志位String flag = (String) channel.attr(AttributeKey.valueOf(BIZ_FLAG)).get();if (ObjectUtils.isEmpty(flag)) {log.warn("没有业务标识");return;}ByteBuf buf = (ByteBuf) msg;byte[] msgByte = new byte[buf.readableBytes()];buf.readBytes(msgByte);
//        template.send("haha.haha.ha", gbk.getBytes());log.info("bizFag:{},data: {}", flag, new String(msgByte));}//通知处理器最后的channelRead()是当前批处理中的最后一条消息时调用@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}//读操作时捕获到异常时调用@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {ctx.close();}//客户端去和服务端连接成功时触发@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {
//        ctx.writeAndFlush(Unpooled.copiedBuffer("hello client [你好,客户端]".getBytes()));log.info("client 连接成功: {}", ctx.channel());}
}

3.编写客户端代码

TCP Client 代码

package com.huanyu.forward.tcp.client;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;import java.util.stream.IntStream;@Getter
@Slf4j
public class TcpNettyClient {public static void main(String[] args) {extracted();}private static void extracted() {try {TcpNettyClient client = new TcpNettyClient("localhost", 4444);Channel channel = client.getChannel();IntStream.range(0, 1000).parallel().forEach(i -> {ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();buf.writeBytes(("@@{\"cell-topic" + (i + 1) + "\":true}##{01#.01#\":\"data1\"}").getBytes());buf.writeByte('$');channel.writeAndFlush(buf);});} catch (Exception e) {log.error("出现异常:", e);}}private Channel channel;//连接服务端的端口号地址和端口号public TcpNettyClient(String host, int port) {tcpClient(host, port);}public void tcpClient(String host, int port) {try {final EventLoopGroup group = new NioEventLoopGroup();Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class)  // 使用NioSocketChannel来作为连接用的channel类.handler(new ChannelInitializer<SocketChannel>() { // 绑定连接初始化器@Overridepublic void initChannel(SocketChannel ch) throws Exception {System.out.println("正在连接中...");ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new TcpClientHandler()); //客户端处理类}});//发起异步连接请求,绑定连接端口和host信息final ChannelFuture future = b.connect(host, port).sync();future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture arg0) throws Exception {if (future.isSuccess()) {log.info("连接服务器成功:");} else {log.warn("连接服务器失败:");System.out.println("连接服务器失败");group.shutdownGracefully(); //关闭线程组}}});this.channel = future.channel();} catch (InterruptedException e) {log.error("TCP服务端启动异常:", e);}}}

 客户端数据解析代码

package com.huanyu.forward.tcp.client;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.util.Map;public class TcpClientHandler extends SimpleChannelInboundHandler<Map<String, ByteBuf>> {//处理服务端返回的数据@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Map<String, ByteBuf> data) throws Exception {ByteBuf msg = data.get("topic");byte[] msgByte = new byte[msg.readableBytes()];msg.readBytes(msgByte);System.out.println("接受到server响应数据: " + new String(msgByte));}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {
//        ctx.writeAndFlush(Unpooled.copiedBuffer("hello server 你好".getBytes()));super.channelActive(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}

 备注

1. 为了尽可能的降低性能消耗,数据以字节数组的形式发送

2. 业务字段通过@@{"key":"value"}##作为消息的头部,用数据标志位处理器进行处理

3. 真实要传送的数据,并不解析出来,并以$结尾,解决粘包和半包问题

记录备查

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/news/909370.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Cursor链接远程服务器实现项目部署

想获取更多高质量的Java技术文章&#xff1f;欢迎访问Java技术小馆官网&#xff0c;持续更新优质内容&#xff0c;助力技术成长 技术小馆官网 在软件开发过程中&#xff0c;远程服务器开发是一种常见的工作模式。通过远程连接服务器进行代码编写和环境配置&#xff0c;可以充分…

Redis集群模式之Redis Cluster(3)

上篇文章我们讲解了Redis Cluster的状态监测与恢复过程&#xff0c;这篇文章我们来进行Redis Cluster内容的收尾&#xff0c;将其扩容和缩容的过程进行讲解&#xff0c;并分析RedisCluster的优缺点。 扩容和缩容 当集群中出现容量限制或者其他一些原因需要扩容时&#xff0c;R…

Cursor ReAct Agent技术架构

一、架构核心思想 “零熵操作交给AI”理念 Cursor通过ReAct模式实现编程中重复性工作的自动化&#xff1a; 零熵操作&#xff1a;机械性任务&#xff08;代码补全/格式化/重构/语法修复/导入管理&#xff09; Tab-away机制&#xff1a;一键接受AI建议&#xff0c;保持思维连续…

国学IP行业实战洞察:聚焦创客匠人,解锁创始人IP与知识变现新路径

国学行业正经历“文化价值”与“商业变现”的深度融合&#xff0c;2023年市场规模突破千亿大关&#xff0c;年增速超 10%。在“IP化数字化”浪潮中&#xff0c;创客匠人作为垂直领域技术服务商&#xff0c;以全链路工具矩阵为支点&#xff0c;撬动国学创始人IP从内容生产到商业…

R语言开发入门完整指南

R语言开发入门完整指南 目录 R语言简介环境配置包管理基本语法数据类型和结构数据操作统计分析数据可视化编程结构实用技巧学习资源 R语言简介 R是一种专为统计计算和图形设计的编程语言&#xff0c;广泛应用于数据分析、统计建模、机器学习和数据可视化。R语言具有以下特点…

ObservedV2装饰器和Trace装饰器

为了对嵌套类对象属性变化直接观测&#xff0c;华为提供了ObservedV2和Trace装饰器。这两个装饰器必须搭配使用&#xff0c;单独使用任何一个都不会起任何作用&#xff1b;在继承类中也可监测&#xff1b;ObservedV2的类实例目前不支持使用JSON.stringify进行序列化&#xff0c…

6月计算机新书:深度学习、大模型、DeepSeek

六月&#xff0c;这个充满活力与希望的季节&#xff0c;三本重磅新书《深度学习&#xff1a;基础与概念》、《MCP极简开发&#xff1a;轻松打造高效智能体》与《大模型应用开发&#xff1a;RAG实战课》翩然而至&#xff0c;为我们开启了一场探索科技前沿的奇妙之旅。一起来看详…

扁平风格职场商务通用PPT模版分享

扁平风格PPT模版&#xff0c;创意卡通扁平化通用PPT模版&#xff0c;创意扁平化励志论文答辩PPT模版&#xff0c;卡通职场商务PPT模版&#xff0c;职场培训&#xff0c;项目策划&#xff0c;工作总结类PPT模版&#xff0c;互联网电子商务PPT模版 扁平风格职场商务通用PPT模版分…

jupyter内核崩溃

最近在做用k-mer评估基因组规模的任务&#xff0c;其中一个局部程序&#xff0c;想偷懒&#xff0c;直接在jupyter中跑了下结果&#xff0c;想看看这一小步处理如何&#xff0c;结果没想到内核崩溃了&#xff01; 这一步我的草稿代码如下&#xff1a; import pandas as pd imp…

Java企业技术趋势分析:AI应用的落地实践与未来展望

Java企业技术趋势分析&#xff1a;AI应用的落地实践与未来展望 开篇&#xff1a;技术趋势与市场需求 在当前快速发展的数字化时代&#xff0c;人工智能&#xff08;AI&#xff09;已经成为推动企业创新和效率提升的关键力量。Java作为企业级应用开发的主流语言&#xff0c;正…

每日Prompt:Steve Winter风格插画

提示词 世界摄影大师杰作&#xff0c;极简主义&#xff0c;Steve Winter风格&#xff0c;6只不同颜色的布偶猫围成一圈&#xff0c;看向镜头中心&#xff0c;仰天视角&#xff0c;天空背景&#xff0c;高品质细节&#xff0c;超精细CG&#xff0c;高分辨率&#xff0c;最佳品质…

Vue3 + Element Plus 获取表格列信息

在 Vue 3 和 Element Plus 中&#xff0c;可以通过以下步骤获取表格的列信息&#xff1a; 实现步骤&#xff1a; 使用 ref 绑定表格实例 通过表格实例的 store.states.columns 获取列数据 处理列信息&#xff08;过滤隐藏列、处理嵌套表头等&#xff09; 示例代码&#xf…

logger2js - JavaScript日志与调试工具库

logger2js - JavaScript日志与调试工具库 logger2js是一个功能强大的前端JavaScript日志与调试工具库&#xff0c;提供了丰富的日志输出、性能测试和代码调试功能。该库支持配置化引入&#xff0c;包含5种皮肤风格和丰富的API接口&#xff0c;如 a l e r t 增强方法、 alert增…

Stone 3D使用RemoteMesh组件极大的缩小工程文件尺寸

Stone 3D的工程文件tsp默认包含了场景中所有的对象和数据&#xff0c;这样的好处是tsp可以单独离线保存&#xff0c;但坏处是tsp文件通常偏大。 解决这个问题的方法是把外部glb模型文件通过RemoteMesh组件来加载。 首先创建一个空实体&#xff0c;然后给该空实体添加RemoteMe…

【深入剖析】攻克 Java 并发的基石:Java 内存模型 (JMM) 原理与实践指南

0.引言 理解 JMM (Java Memory Model - JMM) 是掌握 Java 并发编程的关键&#xff0c;它定义了多线程环境下&#xff0c;线程如何与主内存以及彼此之间交互内存数据。 核心目标&#xff1a; JMM 旨在解决多线程编程中的三个核心问题&#xff1a; 原子性 (Atomicity)&#xf…

【Three.js】初识 Three.js

Threejs介绍 我们开发 webgl 主要是使用 threejs 这个库&#xff0c;因为 webGL太难用&#xff0c;太复杂&#xff01;但是现代浏览器都支持WebGL&#xff0c;这样我们就不必使用Flash、Java等插件就能在浏览器中创建三维图形。 threejs 它提供-一个很简单的关于WebGL特性的J…

【经验总结】ECU休眠后连续发送NM报文3S后ECU网络才被唤醒问题分析

目录 前言 正文 1.问题描述 2.问题分析 3.验证猜想 4.总结 前言 ECU的上下电/休眠唤醒在ECU开发设计过程中最容易出问题且都为严重问题,最近在项目开发过程中遇到ECU休眠状态下连续发送NM报文3S后才能唤醒CAN网络的问题,解决问题比较顺利,但分析过程中涉及到的网络休…

企业架构框架深入解析:TOGAF、Zachman Framework、FEAF与Gartner EA Framework

执行摘要 企业架构&#xff08;EA&#xff09;是一项至关重要的实践&#xff0c;它使组织能够协调其业务战略、运营流程和技术基础设施&#xff0c;以实现整体战略目标。企业架构框架作为结构化的方法论和综合性工具&#xff0c;旨在管理企业级系统的固有复杂性&#xff0c;提…

数字化动态ID随机水印和ID跑马灯实现教育视频防录屏

摘要&#xff1a;数字化动态ID随机水印和ID跑马灯技术可以有效保护数字教育资源。动态水印将用户信息随机显示在视频上且不可去除&#xff0c;能追踪录屏者并震慑盗版行为。ID跑马灯则自定义显示观看者信息&#xff0c;便于追踪盗版源头并提供法律证据。这些技术大幅增加盗版成…

< 自用文儿 腾讯云 VPS > Ubuntu 24 系统,基本设置

前言&#xff1a; 3 月份买的腾讯云的这台 VPS&#xff0c;刚发现现在退款&#xff0c;只能返回 0 元。测试应用已经迁移到JD&#xff0c;清除内容太麻烦&#xff0c;重装更简单。 因为配合政策&#xff0c;国内的云主机都有两个 IP 地址&#xff0c;一个内网&#xff0c;一个…