【如何实现分布式压测中间件】

分布式压测中间件的原理及其实现

    • 原理
    • 全链路追踪框架(Trace)
    • MQ中间件
    • 数据库
    • 分布式缓存中间件(Redis)
    • 分库分表中间件

原理

通过大量阅读中间件源码,开源社区调研,得到设计原理:
(1)发起压测链路http请求
(2)通过分布式追踪框架获取URL上影子标识,将其放入上下文Context中
(3)提供者应用发起PRC/MQ调用时,中间件会将测试标放入中间件的Context上下文中传递。
(4)消费者处理RPC/MQ消息,获取中间件Context上下文。
(5)经过分库分表/缓存数据库中间件,获取当前Context里的影子标识。

打成Maven包,在项目中直接引入

  1. 可插拔,业务代码不感知。
  2. 支持复杂SQL处理,支持全链路测试,且支持全链路追踪。
  3. 极大提高压测工作效率。

全链路追踪框架(Trace)

从HTTP请求链接上识别到特定的key,如:

URL添加压测标识,test = true,将压测标识添加到追踪链路框架中的Context上下文中。

MQ中间件

例如RocketMQ: com.alibaba.rocketmq.client.hook.SendMessageHook
实现接口SendMessageHook进行日志追踪链路埋点, 分布式链路组件SOFA
Trace也是基于这个接口去埋点,这是mq官方留给实现者的AOP。

public class MetaQSendMessageHookImpl implements SendMessageHook, MetaQTraceConstants {public MetaQSendMessageHookImpl() {}public String hookName() {return "EagleEyeSendMessageHook";}public void sendMessageBefore(SendMessageContext context) {if (context != null && context.getMessage() != null && MetaQTraceLogUtils.isTraceLogOn(context.getProducerGroup())) {MetaQTraceContext mqTraceContext = new MetaQTraceContext();context.setMqTraceContext(mqTraceContext);mqTraceContext.setMetaQType(MetaQType.METAQ);mqTraceContext.setGroup(context.getProducerGroup());mqTraceContext.setAsync(CommunicationMode.ASYNC.equals(context.getCommunicationMode()));Message msg = context.getMessage();if (msg != null) {MetaQTraceBean traceBean = new MetaQTraceBean();traceBean.setTopic(msg.getTopic());traceBean.setOriginMsgId(MessageAccessor.getOriginMessageId(msg));traceBean.setTags(msg.getTags());traceBean.setKeys(msg.getKeys());traceBean.setBuyerId(msg.getBuyerId());traceBean.setTransferFlag(MessageAccessor.getTransferFlag(msg));traceBean.setCorrectionFlag(MessageAccessor.getCorrectionFlag(msg));traceBean.setBodyLength(msg.getBody().length);traceBean.setBornHost(context.getBornHost());traceBean.setStoreHost(context.getBrokerAddr());traceBean.setBrokerName(context.getMq().getBrokerName());traceBean.setProps(context.getProps());traceBean.setMsgType(context.getMsgType());List<MetaQTraceBean> beans = new ArrayList();beans.add(traceBean);mqTraceContext.setTraceBeans(beans);if (StringUtils.isNotBlank(msg.getUserProperty("eagleTraceId"))) {traceBean.setTraceId(msg.getUserProperty("eagleTraceId"));traceBean.setRpcId(msg.getUserProperty("eagleRpcId"));traceBean.setEagleEyeUserData(msg.getUserProperty("eagleData"));}MetaQSendMessageTraceLog.sendMessageBefore(mqTraceContext);if (StringUtils.isBlank(msg.getProperty("eagleTraceId")) && StringUtils.isNotBlank(traceBean.getTraceId())) {msg.putUserProperty("eagleTraceId", traceBean.getTraceId());msg.putUserProperty("eagleRpcId", traceBean.getRpcId());msg.putUserProperty("eagleData", traceBean.getEagleEyeUserData());}}}}public void sendMessageAfter(SendMessageContext context) {if (context != null && context.getMessage() != null && context.getSendResult() != null && MetaQTraceLogUtils.isTraceLogOn(context.getProducerGroup())) {MetaQTraceContext mqTraceContext = (MetaQTraceContext)context.getMqTraceContext();mqTraceContext.setRegionId(context.getSendResult().getRegionId());MetaQTraceBean traceBean = (MetaQTraceBean)mqTraceContext.getTraceBeans().get(0);if (traceBean != null && context.getSendResult() != null) {traceBean.setQueueId(context.getMq().getQueueId());traceBean.setMsgId(context.getSendResult().getOffsetMsgId());traceBean.setOriginMsgId(context.getSendResult().getMsgId());traceBean.setOffset(context.getSendResult().getQueueOffset());mqTraceContext.setSuccess(true);mqTraceContext.setStatus(context.getSendResult().getSendStatus().toString());} else if (context.getException() != null) {String msg = context.getException().getMessage();mqTraceContext.setErrorMsg(StringUtils.substring(msg, 0, msg.indexOf("\n")));}MetaQSendMessageTraceLog.sendMessageAfter(mqTraceContext);}}
}

数据库

参考数据库Druid链接池方案:
https://github.com/alibaba/druid/wiki/SQL-Parser

import com.alibaba.druid.filter.FilterAdapter;
import com.alibaba.druid.filter.FilterChain;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.proxy.jdbc.DataSourceProxy;
import com.alibaba.druid.proxy.jdbc.StatementProxy;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.util.JdbcConstants;
import lombok.extern.slf4j.Slf4j;
import java.sql.SQLException;
import java.util.List;/*** 拦截druid数据链接池* @author doge* @date 2021/10/19*/
@Slf4j
public class DruidShadowTestFilter extends FilterAdapter {private DruidShadowTestVisitor visitor = new DruidShadowTestVisitor();@Overridepublic boolean statement_execute(FilterChain chain, StatementProxy statement, String sql) throws SQLException {try {List<SQLStatement> sqlStatements = SQLUtils.parseStatements(sql, JdbcConstants.MYSQL);sqlStatements.forEach(sqlStatement -> sqlStatement.accept(visitor));if (visitor.getRewriteStatus()) {// 改写了SQL,需要替换String newSql = SQLUtils.toSQLString(sqlStatements,JdbcConstants.MYSQL);log.debug("rewrite sql, origin sql: [{}], new sql: [{}]", sql, newSql);return super.statement_execute(chain, statement, newSql);}return super.statement_execute(chain, statement, sql);} finally {visitor.removeRewriteStatus();}}@Overridepublic void init(DataSourceProxy dataSourceProxy){if (!(dataSourceProxy instanceof DruidDataSource)) {log.error("ConfigLoader only support DruidDataSource");}DruidDataSource dataSource = (DruidDataSource) dataSourceProxy;log.info("db configuration: url="+ dataSource.getUrl());}}
import com.alibaba.druid.sql.ast.statement.SQLExprTableSource;
import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlASTVisitorAdapter;
import com.alibaba.ewtp.test.utils.ShadowTestUtil;
import org.apache.commons.lang3.StringUtils;import java.util.Optional;/*** @author doge* @date 2021/10/20*/
public class DruidShadowTestVisitor extends MySqlASTVisitorAdapter {private static final ThreadLocal<Boolean> REWRITE_STATUS_CACHE = new ThreadLocal<>();@Overridepublic boolean visit(SQLExprTableSource sqlExprTableSource) {// 别名,如果有别名,别名保持不变String alias = StringUtils.isEmpty(sqlExprTableSource.getAlias()) ? sqlExprTableSource.getExpr().toString() : sqlExprTableSource.getAlias();// 修改表名,不包含点才加 select c.id,d.name from c left join d on c.id = d.id 中的c 和 dif(!sqlExprTableSource.getExpr().toString().contains(".")) {sqlExprTableSource.setExpr(ShadowTestUtil.PREFIX  + sqlExprTableSource.getExpr());}sqlExprTableSource.setAlias(alias);REWRITE_STATUS_CACHE.set(true);return true;}/*** 返回重写状态* @return 重写状态,{@code true}表示已重写,{@code false}表示未重写*/public boolean getRewriteStatus() {// get reset rewrite statusreturn Optional.ofNullable(REWRITE_STATUS_CACHE.get()).orElse(Boolean.FALSE);}/*** 重置重写状态*/public void removeRewriteStatus() {REWRITE_STATUS_CACHE.remove();}
}

分布式缓存中间件(Redis)

可以参考SofaTrace做法
https://www.sofastack.tech/blog/sofa-channel-15-retrospect/

  1. 新增一个Redis的后置增强器(部分代码)
  2. 实现redis的连接工厂(部分代码)
  3. 实现redis的连接器(会在所有redis key前加上前缀 test

import com.alibaba.ewtp.test.factory.TracingRedisConnectionFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.data.redis.connection.RedisConnectionFactory;
```java/*** @author doge* @date 2021/10/14* redis 后置增强处理*/
public class TracingRedisBeanPostProcessor implements BeanPostProcessor {public TracingRedisBeanPostProcessor(){}@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (bean instanceof RedisConnectionFactory) {bean = new TracingRedisConnectionFactory((RedisConnectionFactory) bean);}return bean;}
}import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.*;/*** @author doge* @date 2021/10/14*/
public class TracingRedisConnectionFactory implements RedisConnectionFactory {private final RedisConnectionFactory delegate;public TracingRedisConnectionFactory(RedisConnectionFactory delegate) {this.delegate = delegate;}@Overridepublic RedisConnection getConnection() {// support cluster connectionRedisConnection connection = this.delegate.getConnection();return new TracingRedisConnection(connection);}@Overridepublic RedisClusterConnection getClusterConnection() {return delegate.getClusterConnection();}@Overridepublic boolean getConvertPipelineAndTxResults() {return delegate.getConvertPipelineAndTxResults();}@Overridepublic RedisSentinelConnection getSentinelConnection() {return delegate.getSentinelConnection();}@Overridepublic DataAccessException translateExceptionIfPossible(RuntimeException e) {return delegate.translateExceptionIfPossible(e);}
}import org.springframework.dao.DataAccessException;
import org.springframework.data.geo.Distance;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.geo.Circle;
import org.springframework.data.geo.GeoResults;
import org.springframework.data.geo.Metric;
import org.springframework.data.geo.Point;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.data.redis.core.types.RedisClientInfo;import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;/*** @author doge* @date 2021/10/14*/
public class TracingRedisConnection implements RedisConnection {private final RedisConnection connection;public TracingRedisConnection(RedisConnection connection) {this.connection = connection;}@Overridepublic Boolean expire(byte[] key, long seconds) {handleByte(key);return connection.expire(key, seconds);}@Overridepublic Boolean set(byte[] key, byte[] value) {handleByte(key);return connection.set(key, value);}@Overridepublic Boolean mSet(Map<byte[], byte[]> tuple) {handleByteMap(tuple);return connection.mSet(tuple);}public void handleByte(byte[] key){if (ShadowTestUtil.isShadowTesLink()){key = (ShadowTestUtil.prefix + new String(key)).getBytes();}}public void handleBytes(byte[]... keys){if (ShadowTestUtil.isShadowTesLink()){for (byte[] bytes : keys){handleByte(bytes);}}}public void handleByteMap(Map<byte[], byte[]> tuple){if (ShadowTestUtil.isShadowTesLink()){for (Map.Entry<byte[], byte[]> entry : tuple.entrySet()){handleByte(entry.getKey());}}}}

分库分表中间件

开源框架的解决方案:
https://shardingsphere.apache.org/document/current/cn/features/shadow

方案&思路: 当获取压测标后,若开启影子链路,将打开Sharding影子库的开关,串通起整个分库分表链路。当然也可以直接用数据库连接池来解决。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/87376.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/87376.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Qt进程间保活方案:详解如何实现进程间通信与自动保活机制

目录 摘要 一、进程间保活的基本原理 二、具体步骤及代码示例 三、常见问题与优化 四、总体方案 摘要 在一些需要长时间运行的应用程序中&#xff0c;确保进程在意外退出时能够自动重启是一项非常重要的任务。尤其是在嵌入式开发、后台服务以及需要高可用性的场景下&#x…

Python-内置数据结构-list-tuple-bubble-字符串-bytes-bytesarray-切片-学习笔记

欠4年前自己的一份笔记&#xff0c;献给今后的自己。 分类 数值型 int、float、complex、bool 序列对象 字符串 str 列表 list tuple 键值对 集合set 字典dict 数值型 int、float、complex、bool都是class&#x…

利用事务钩子函数解决业务异步发送问题

利用事务钩子函数解决业务异步发送问题 一、问题背景二、实现方案1、生产者代码2、消费者代码 三、测试与验证1、未开启事务场景2、开启事务场景 四、项目结构及源码 一、问题背景 在某项业务中&#xff0c;需要在事务完成后&#xff0c;写入日志到某数据库中。需要要么都成功&…

uniapp选择相册

概述 一款针对Android平台下的图片选择器&#xff0c;支持从相册获取图片、视频、音频&拍照&#xff0c;支持裁剪(单图or多图裁剪)、压缩、主题自定义配置等功能&#xff0c;支持动态获取权限&适配Android 5.0系统的开源图片选择框架。 支持Uniapp和Uniapp X下的Vue2、…

MAC 多应用切换技巧,单应用切换技巧

在 Mac 上&#xff0c;有几种快捷键可以帮助你快速切换应用程序窗口&#xff1a; 1. Command (⌘) Tab - 这是最常用的快捷键&#xff0c;用于在打开的应用程序之间进行循环切换。按住 Command 键不放&#xff0c;然后反复按下 Tab 键可以选择下一个应用程序。当你松开 Comm…

SpringBoot+本地部署大模型实现知识库功能

SpringBoot本地部署大模型实现RAG知识库功能 1、Linux系统部署本地大模型1.1 安装ollama1.2 启动ollama1.3 下载deepseek模型 2、Springboot代码调用本地模型实现基础问答功能3、集成向量数据库4、知识库数据喂取5、最终实现RAG知识库功能 1、Linux系统部署本地大模型 1.1 安装…

嵌入式原理与应用篇---ARM

ARM 架构的 STM32 系列微控制器广泛应用于嵌入式系统开发&#xff0c;理解其汇编语言指令对于优化性能、访问硬件底层非常重要。下面详细解释常见的 ARM 汇编指令及其使用实例。 数据处理指令 1. MOV&#xff08;移动&#xff09; 功能&#xff1a;将立即数或寄存器值复制到…

【RHCSA-Linux考试题目笔记(自用)】servera的题目

一、开始 1、启动rhcsa环境 2、点击题目&#xff0c;看题 3、通过控制器来启动所有虚拟机 控制器 打开后点start&#xff0c;然后ok 之后进入一个有classroom、servera、serverb&#xff08;考试不一定叫这些名&#xff0c;但大差不差&#xff09;什么之类的界面&#xff0c;…

SpringBoot项目使用arthas-tunnel-server

参考官网Arthas Spring Boot Starter | arthas Spring Boot系列之使用Arthas Tunnel Server 进行远程调试实践-腾讯云开发者社区-腾讯云 springBoot项目, 增加maven依赖 <dependency><groupId>com.taobao.arthas</groupId><artifactId>arthas-sprin…

Modbus TCP 进阶:基于以太网的远程设备控制(二)

基于 Modbus TCP 的远程设备控制实战 &#xff08;一&#xff09;硬件与网络搭建实操 1. 设备选型与连接 在工业现场&#xff0c;根据远程控制需求进行设备选型至关重要 。对于传感器&#xff0c;若要监测温度&#xff0c;可选择高精度的热电偶传感器&#xff0c;如 K 型热电…

分库分表之实战-sharding-JDBC

大家好&#xff0c;我是工藤学编程 &#x1f989;一个正在努力学习的小博主&#xff0c;期待你的关注实战代码系列最新文章&#x1f609;C实现图书管理系统&#xff08;Qt C GUI界面版&#xff09;SpringBoot实战系列&#x1f437;【SpringBoot实战系列】Sharding-Jdbc实现分库…

httpcore-nio引起的线程、fd泄露问题

依赖来源&#xff1a;httpasyncclient-4.1.4.jar 现象 程序报错too many open files 线程数飙升、句柄数飙升 thread dump显示大量 "I/O dispatcher 7215" #9102 prio5 os_prio0 tid0x00002b7ba036a800 nid0x6f24 runnable [0x00002b7d98d41000]java.lang.Thread.…

多线程生产者消费者模型实战案例

多线程生产者消费者模型实战案例 前言业务场景术前准备无锁无事务有事务 synchronized事务在锁外事务在锁内 数据库行锁什么是数据库行锁有事务没有事务 乐观锁ReentrantLock分布式锁 前言 曾经一直有一个疑惑&#xff0c;就是关于多线程生产者消费者模型的学习过程中&#xf…

青少年编程与数学 02-022 专业应用软件简介 03 三维建模及动画软件:Autodesk Maya

青少年编程与数学 02-022 专业应用软件简介 03 三维建模及动画软件&#xff1a;Autodesk Maya 一、什么是三维建模二、什么是计算机动画三、三维建模及动画设计软件的发展历程&#xff08;一&#xff09;早期探索阶段&#xff08;20世纪60年代 - 80年代&#xff09;&#xff08…

获得 OCM 大师证书学习历练

当我站在山城重庆的洪崖洞前&#xff0c;看着璀璨的夜景倒映在嘉陵江上&#xff0c;手中紧握着 OCM 大师证书&#xff0c;那一刻&#xff0c;备考时的艰辛与考试时的紧张都化作了满满的成就感。这段在重庆获得 OCM 大师证书的经历&#xff0c;就像一场充满挑战与惊喜的冒险&…

srs-gb28181 与 SRS 5.0 对 GB28181 国标支持

srs-gb28181 是基于 SRS 4.0/5.0 的国标&#xff08;GB28181&#xff09;扩展分支&#xff0c;而 SRS 5.0 官方版本也逐步增强了对 GB28181 的支持。以下是两者的主要区别&#xff1a; 1. 功能支持对比 功能srs-gb28181&#xff08;扩展分支&#xff09;SRS 5.0&#xff08;官…

算法第18天|继续二叉树:修剪二叉搜索树、将有序数组转化为二叉搜索树、把二叉搜索树转换为累加树

今日总结&#xff1a; 1、修剪二叉搜索树&#xff08;重点思考如何修剪&#xff09; &#xff08;1&#xff09;递归的返回值是什么&#xff1f;&#xff08;与插入、删除一样&#xff09; &#xff08;2&#xff09;递归的单层逻辑一定要缕清&#xff08;3中情况讨论&#xff…

C# 多线程(三)线程池

目录 1.通过TPL使用线程池 2.不使用TPL进入线程池的办法 异步委托 3.线程池优化技术 最小线程数的工作原理 每当启动一个新线程时&#xff0c;系统都需要花费数百微秒来分配资源&#xff0c;例如创建独立的局部变量栈空间。默认情况下&#xff0c;每个线程还会占用约1…

学习笔记(29):训练集与测试集划分详解:train_test_split 函数深度解析

学习笔记(29):训练集与测试集划分详解&#xff1a;train_test_split 函数深度解析 一、为什么需要划分训练集和测试集&#xff1f; 在机器学习中&#xff0c;模型需要经历两个核心阶段&#xff1a; 训练阶段&#xff1a;用训练集数据学习特征与目标值的映射关系&#xff08;…

【全网唯一】自动化编辑器 Windows版纯本地离线文字识别插件

目的 自动化编辑器超轻量级RPA工具&#xff0c;零代码制作RPA自动化任务&#xff0c;解放双手&#xff0c;释放双眼&#xff0c;轻松玩游戏&#xff0c;刷任务。本篇文章主要讲解下自动化编辑器的TomatoOCR纯本地离线文字识别Windows版插件如何使用和集成。 准备工作 1、下载自…