物联网_TDengine_EMQX_性能测试

一、Tdengine接口开发文档

1、数据库

1.创建数据库

  • URL

    /dp/createdb/

  • method

    post

  • 请求示例

    {"db_name":"demo01" // 必填
    }
    
  • 响应示例

    // 成功
    {"code": 1,"data": {"成功创建数据库": "demo04"},"error": null
    }
    
    // 失败
    {"code": 0,"data": null,"error": "创建数据库失败, 该数据库已存在!"
    }2.获取数据库
    

2.获取数据库

  • URL

    /dp/createdb/

  • method

    get

  • 请求示例

    /dp/createdb/?db_name=test

    参数:db_name 数据库名称,非必填

    未填数据库名称则查询所有数据库

    填写数据库名称则查询数据库参数

  • 响应示例

    // 查询所有数据库
    {"code": 1,"data": ["information_schema","performance_schema","demo01","demo06","test","demo03","demo02"],"error": null
    }
    
    // 查询数据库参数
    {"code": 1,"data": {"name": "test","create_time": "2024-02-21T10:33:32.727000","vgroups": 2,"ntables": 2,"replica": 1,"strict": "on","duration": "14400m","keep": "5256000m,5256000m,5256000m","buffer": 256,"pagesize": 4,"pages": 256,"minrows": 100,"maxrows": 4096,"comp": 2,"precision": "ms","status": "ready","retentions": null,"single_stable": false,"cachemodel": "last_row","cachesize": 1,"wal_level": 1,"wal_fsync_period": 3000,"wal_retention_period": 0,"wal_retention_size": 0,"wal_roll_period": 0,"wal_segment_size": 0,"stt_trigger": 1,"table_prefix": 0,"table_suffix": 0,"tsdb_pagesize": 4},"error": null
    }
    

3.修改数据库参数

  • URL

    /dp/createdb/

  • method

    put

  • 请求示例

    {"db_name":"demo01", // 必填"cachemodel":"none","wal_level":"1"
    }
    
    • CACHEMODEL:表示是否在内存中缓存子表的最近数据。默认为 none。
      • none:表示不缓存。
      • last_row:表示缓存子表最近一行数据。这将显著改善 LAST_ROW 函数的性能表现。
      • last_value:表示缓存子表每一列的最近的非 NULL 值。这将显著改善无特殊影响(WHERE、ORDER BY、GROUP BY、INTERVAL)下的 LAST 函数的性能表现。
      • both:表示同时打开缓存最近行和列功能。 Note:CacheModel 值来回切换有可能导致 last/last_row 的查询结果不准确,请谨慎操作。推荐保持打开。
    • CACHESIZE:表示每个 vnode 中用于缓存子表最近数据的内存大小。默认为 1 ,范围是[1, 65536],单位是 MB。
    • BUFFER: 一个 VNODE 写入内存池大小,单位为 MB,默认为 256,最小为 3,最大为 16384。
    • PAGES:一个 VNODE 中元数据存储引擎的缓存页个数,默认为 256,最小 64。一个 VNODE 元数据存储占用 PAGESIZE * PAGES,默认情况下为 1MB 内存。
    • PAGESIZE:一个 VNODE 中元数据存储引擎的页大小,单位为 KB,默认为 4 KB。范围为 1 到 16384,即 1 KB 到 16 MB。
    • REPLICA:表示数据库副本数,取值为 1 或 3,默认为 1。在集群中使用,副本数必须小于或等于 DNODE 的数目。
    • STT_TRIGGER:表示落盘文件触发文件合并的个数。默认为 1,范围 1 到 16。对于少表高频场景,此参数建议使用默认配置,或较小的值;而对于多表低频场景,此参数建议配置较大的值。
    • WAL_LEVEL:WAL 级别,默认为 1。
      • 1:写 WAL,但不执行 fsync。
      • 2:写 WAL,而且执行 fsync。
    • WAL_FSYNC_PERIOD:当 WAL_LEVEL 参数设置为 2 时,用于设置落盘的周期。默认为 3000,单位毫秒。最小为 0,表示每次写入立即落盘;最大为 180000,即三分钟。
    • DURATION:数据文件存储数据的时间跨度。可以使用加单位的表示形式,如 DURATION 100h、DURATION 10d 等,支持 m(分钟)、h(小时)和 d(天)三个单位。不加时间单位时默认单位为天,如 DURATION 50 表示 50 天。
    • KEEP:表示数据文件保存的天数,缺省值为 3650,取值范围 [1, 365000],且必须大于或等于3倍的 DURATION 参数值。数据库会自动删除保存时间超过 KEEP 值的数据。KEEP 可以使用加单位的表示形式,如 KEEP 100h、KEEP 10d 等,支持 m(分钟)、h(小时)和 d(天)三个单位。也可以不写单位,如 KEEP 50,此时默认单位为天。企业版支持多级存储功能, 因此, 可以设置多个保存时间(多个以英文逗号分隔,最多 3 个,满足 keep 0 <= keep 1 <= keep 2,如 KEEP 100h,100d,3650d); 社区版不支持多级存储功能(即使配置了多个保存时间, 也不会生效, KEEP 会取最大的保存时间)。
    • COMP:表示数据库文件压缩标志位,缺省值为 2,取值范围为[0, 2]。
      • 0:表示不压缩。
      • 1:表示一阶段压缩。
      • 2:表示两阶段压缩。
    • SINGLE_STABLE:表示此数据库中是否只可以创建一个超级表,用于超级表列非常多的情况。
      • 0:表示可以创建多张超级表。
      • 1:表示只可以创建一张超级表。
    • MAXROWS:文件块中记录的最大条数,默认为 4096 条。
    • MINROWS:文件块中记录的最小条数,默认为 100 条。
    • TSDB_PAGESIZE:一个 VNODE 中时序数据存储引擎的页大小,单位为 KB,默认为 4 KB。范围为 1 到 16384,即 1 KB到 16 MB。
    • WAL_RETENTION_PERIOD: 为了数据订阅消费,需要WAL日志文件额外保留的最大时长策略。WAL日志清理,不受订阅客户端消费状态影响。单位为 s。默认为 3600,表示在 WAL 保留最近 3600 秒的数据,请根据数据订阅的需要修改这个参数为适当值。
    • WAL_RETENTION_SIZE:为了数据订阅消费,需要WAL日志文件额外保留的最大累计大小策略。单位为 KB。默认为 0,表示累计大小无上限。
  • 响应示例

    {"code": 1,"data": {"修改数据库参数成功": 0},"error": null
    }
    

4.删除数据库

  • URL

    /dp/createdb/

  • method

    delete

  • 请求示例

    {// 可以删除多个"db_names":["demo04","demo03"]
    }
    
  • 响应示例

    {"code": 1,"data": {"删除数据库成功": 2},"error": null
    }
    

2、超级表

1.创建超级表

  • URL

    /dp/createstable/

  • method

    post

  • 请求示例

    {"db_name": "demo06", // 必填"stable_name": "st01", // 必填"fields": {"ts": "TIMESTAMP", // 必填"current": "FLOAT", // 最低选填一个"voltage": "INT","phase": "FLOAT"},"tags":{"point":"varchar(100)", // 最低选填一个"status":"varchar(100)"}
    }
    
  • 响应示例

    {"code": 1,"data": {"创建超级表成功": "st04"},"error": null
    }
    

2.查询超级表

  • URL

    /dp/createstable/

  • method

    get

  • 请求示例

    /dp/createstable/?db_name=demo06&stable_name=st01

    参数:db_name 数据库名称,必填

    ​ stable_name 超级表名称,非必填

    未填超级表名称则查询该数据库下的所有超级表

    填写超级表名称则查询该数据库下的超级表的结构信息

  • 响应示例

    // 查询该库下的所有超级表
    {"code": 1,"data": ["st01","st02","st03","st04","demo06"],"error": null
    }
    
    // 查询该库下的该超级表的结构信息
    {"code": 1,"data": [{"field": "ts","type": "TIMESTAMP","length": 8,"note": ""},{"field": "current","type": "FLOAT","length": 4,"note": ""},{"field": "voltage","type": "INT","length": 4,"note": ""},{"field": "phase","type": "FLOAT","length": 4,"note": ""},{"field": "point","type": "VARCHAR","length": 100,"note": "TAG"},{"field": "status","type": "VARCHAR","length": 100,"note": "TAG"}],"error": null
    }
    

3.修改超级表

  • URL

    /dp/createstable/

  • method

    put

  • 请求示例

    // 添加表注释
    {"db_name":"demo06","stable_name":"st01","command":"COMMENT","value": ["This is a table comment."]
    }
    
    // 添加新列
    {"db_name":"demo06","stable_name":"st01","command":"ADD COLUMN","value": ["test","FLOAT"]
    }
    
    // 删除列
    {"db_name":"demo06","stable_name":"st01","command":"DROP COLUMN","value": ["test"]
    }
    
    // 重命名标签
    {"db_name":"demo06","stable_name":"st01","command":"RENAME TAG","value": ["test_tag01","test_tag001"]
    }
    
    • “command”:“COMMENT”–添加表注释
    • “command”:“ADD COLUMN”–添加新列
    • “command”:“DROP COLUMN”–删除列
    • “command”:“ADD TAG”–添加标签
    • “command”:“DROP TAG”–删除标签
    • “command”:“RENAME TAG”–重命名标签
  • 响应信息

    {"code": 1,"data": {"修改超级表": "修改成功!"},"error": null
    }
    

4.删除超级表

  • URL

    /dp/createstable/

  • method

    delete

  • 请求示例

    // 可删除多个
    {"db_name":"demo06","stable_names":["st06","st05"]
    }
    
  • 响应示例

    {"code": 1,"data": {"删除超级表成功": 2},"error": null
    }
    

3、子表

1.创建子表

  • URL

    /dp/createtable/

  • method

    post

  • 请求示例

    // 可以创建多个
    {"db_name":"demo06","stable_name":"st01","sub_tables":{"sub01":["办公楼一层101","正常","1"],"sub02":["办公楼二层201","正常","2"]}
    }
    
  • 响应示例

    {"code": 1,"data": {"创建子表成功": ["sub03","sub04"]},"error": null
    }
    

2.获取子表

  • URL

    /dp/createtable/

  • method

    get

  • 请求示例

    /dp/createtable/?db_name=demo06

    参数:db_name 数据库名称,必填

    ​ sub_table_name 子表名称,非必填

    未填子表名称则查询该数据库下的所有子表

    填写子表名称则查询该数据库下的子表的结构信息

  • 响应示例

    // 获取所有子表
    {"code": 1,"data": ["sub01","sub02","sub03","sub04"],"error": null
    }
    
    // 获取子表的结构信息
    {"code": 1,"data": [{"field": "ts","type": "TIMESTAMP","length": 8,"note": ""},{"field": "current","type": "FLOAT","length": 4,"note": ""},{"field": "voltage","type": "INT","length": 4,"note": ""},{"field": "phase","type": "FLOAT","length": 4,"note": ""},{"field": "test","type": "INT","length": 4,"note": ""},{"field": "point","type": "VARCHAR","length": 100,"note": "TAG"},{"field": "status","type": "VARCHAR","length": 100,"note": "TAG"},{"field": "test_tag001","type": "FLOAT","length": 4,"note": "TAG"}],"error": null
    }
    

3.修改子表

  • URL

    /dp/createtable/

  • method

    put

  • 请求示例

    // 只能修改子表标签值
    {"db_name":"test","sub_table_name":"meters_sub3","tag_name": "location","new_tag_value":"beijing"
    }
    
  • 响应示例

    {"code": 1,"data": {"修改子表": "修改成功!"},"error": null
    }
    

4.删除子表

  • URL

    /dp/createtable/

  • method

    delete

  • 请求示例

    // 可以同时删除多个表
    {"db_name":"test","sub_table_names"["meters_sub7","meters_sub6"]
    }
    
  • 响应示例

    {"code": 1,"data": {"删除子表成功": 2},"error": null
    }
    

4、向子表插入数据

1.插入数据

  • URL

    /dp/insertdata/

  • method

    post

  • 请求示例

    // 可以向多个子表根据字段插入多条记录
    {"db_name":"test","table_data": {"d001":{"fields": ["ts", "current", "voltage", "phase"],"values": [["2021-07-13 14:06:34.630", 10.2, 219, 0.32],["2021-07-13 14:06:35.779", 10.15, 217, 0.33]]},"d1002": {"fields": ["ts", "current", "phase"],"values": [["2021-07-13 14:06:34.255", 10.27, 0.31]]}}
    }
    
  • 响应示例

    {"code": 1,"data": {"插入数据成功": "INSERT INTO d1001 (ts, current, voltage, phase) VALUES ('2021-07-13 14:06:34.630', '10.2', '219', '0.32') ('2021-07-13 14:06:35.779', '10.15', '217', '0.33') d1002 (ts, current, phase) VALUES ('2021-07-13 14:06:34.255', '10.27', '0.31') "},"error": null
    }
    

2.修改数据

  • URL

    /dp/insertdata/

  • method

    put

  • 请求示例

    // 根据时间戳修改数据
    {"db_name":"test","sub_table_name":"d1001","ts_name":"ts","ts_value":"2021-07-13 00:00:00.000","fields":["current","voltage"],"value":["11","2"]
    }
    
  • 响应示例

    {"code": 1,"data": {"修改数据成功": "INSERT INTO d1001 (ts,current, voltage) VALUES ('2021-07-13 00:00:00.000','11', '2')"},"error": null
    }
    

3.删除数据

  • URL

    /dp/insertdata/

  • method

    delete

  • 请求示例

    // 删除时间戳等于2021-07-13 00:00:00.000的数据
    {"db_name":"test","sub_table_name":"d1001","ts_name":"ts","ts_value":"2021-07-13 00:00:00.000","operation":"EN"
    }
    
    • “operation”:“GT” – 大于(Greater Than)
    • “operation”:“GE” – 大于等于(Greater Than or Equal)
    • “operation”:“LT” – 小于(Less Than)
    • “operation”:“LE” – 小于等于(Less Than or Equal)
    • “operation”:“EQ” – 等于(Equal)
    • “operation”:“NE” – 不等于(Not Equal)
  • 响应示例

    {"code": 1,"data": {"删除数据成功": "DELETE FROM d1001 WHERE ts = '2021-07-13 00:00:00.000'"},"error": null
    }
    

二、EMQX断线重连

1、实现流程

  1. 设置客户端配置
    • 配置持久化会话 (clean_session=False,并配置唯一ID)。
    • 设置重连策略,包括最大重试次数和重连间隔时间。
  2. 检测连接状态
    • 在连接断开的回调函数中,记录断开的原因,并尝试重连。
  3. 重连处理
    • 使用退避算法进行重连,并在重连成功后重新订阅主题。

2、断线重连逻辑

客户端断开连接后,自动调用on_disconnect方法,更改连接状态,尝试连接(调用reconnect重连方法)。

重连设置最大连接次数,以及使用退避算法,防止短时间内多次重连。

    def on_disconnect(self, client, userdata, rc):self.is_connected = Falseif rc != 0:print("客户端断开。尝试重新连接...")self.reconnect()def reconnect(self):while not self.is_connected and self.reconnect_attempts < self.max_reconnect_attempts:try:wait_time = min(2 ** self.reconnect_attempts, 60) + random.uniform(0, 1)print(f"尝试重新连接 {self.reconnect_attempts + 1}, 等待 {wait_time:.2f} 秒...")time.sleep(wait_time)self.client.reconnect()  # 使用 reconnect 方法重新连接self.is_connected = True  # 如果成功连接,设置 is_connected 为 Trueexcept Exception as e:print(f"重连失败: {e}")self.reconnect_attempts += 1if self.reconnect_attempts >= self.max_reconnect_attempts:print("已达到最大重新连接尝试次数,进入睡眠模式。")# 在这里可以添加进入休眠模式的逻辑

3、整体断线重连demo代码


import time
import random
import paho.mqtt.client as mqtt# MQTT Broker 配置
BROKER_ADDRESS = "192.168.101.130" # 服务端
BROKER_PORT = 1883 # 端口
KEEP_ALIVE_INTERVAL = 60  # 保活时间(秒)
TOPIC = "test/topic" # 主题
MAX_RECONNECT_ATTEMPTS = 10  # 最大重连尝试次数
CLIENT_ID = "test_client_id" # client_id
USERNAME = 'test' # 用户名
PASSWORD = 'test' # 密码
QOS = 1
i = 1class MQTTClient:def __init__(self, broker_address, broker_port, topic, keep_alive=60, max_reconnect_attempts=10):self.broker_address = broker_addressself.broker_port = broker_portself.topic = topicself.keep_alive = keep_aliveself.max_reconnect_attempts = max_reconnect_attemptsself.reconnect_attempts = 0self.is_connected = False# 创建 MQTT 客户端并启用持久会话self.client = mqtt.Client(clean_session=False, client_id=CLIENT_ID)# 设置用户名密码self.client.username_pw_set(USERNAME, PASSWORD)# 设置回调函数self.client.on_connect = self.on_connectself.client.on_message = self.on_messageself.client.on_disconnect = self.on_disconnectdef on_connect(self, client, userdata, flags, rc):if rc == 0:print("连接到 broker ")self.is_connected = Trueself.reconnect_attempts = 0self.client.subscribe(self.topic, qos=QOS)  # 重新订阅主题else:print(f"连接失败并返回代码 {rc}")def on_message(self, client, userdata, msg):print(f"在主题 '{msg.topic}' 收到的消息为: '{msg.payload.decode()}'")def on_disconnect(self, client, userdata, rc):self.is_connected = Falseif rc != 0:print("客户端断开。尝试重新连接...")self.reconnect()def reconnect(self):while not self.is_connected and self.reconnect_attempts < self.max_reconnect_attempts:try:wait_time = min(2 ** self.reconnect_attempts, 60) + random.uniform(0, 1)print(f"尝试重新连接 {self.reconnect_attempts + 1}, 等待 {wait_time:.2f} 秒...")time.sleep(wait_time)self.client.reconnect()  # 使用 reconnect 方法重新连接self.is_connected = True  # 如果成功连接,设置 is_connected 为 Trueexcept Exception as e:print(f"重连失败: {e}")self.reconnect_attempts += 1if self.reconnect_attempts >= self.max_reconnect_attempts:print("已达到最大重新连接尝试次数,进入睡眠模式。")# 在这里可以添加进入休眠模式的逻辑def start(self):# 初始连接try:self.client.connect(self.broker_address, self.broker_port, self.keep_alive)except Exception as e:print(f"初始连接失败: {e} 。尝试重新连接...")self.reconnect()# 启动 MQTT 客户端的循环self.client.loop_start()def stop(self):# 停止 MQTT 客户端的循环并断开连接self.client.loop_stop()self.client.disconnect()def publish(self, message):if self.is_connected:self.client.publish(self.topic, message, qos=QOS)if __name__ == "__main__":# 创建并启动 MQTT 客户端mqtt_client = MQTTClient(BROKER_ADDRESS, BROKER_PORT, TOPIC, KEEP_ALIVE_INTERVAL, MAX_RECONNECT_ATTEMPTS)mqtt_client.start()try:while True:message = f"Hello MQTT {i}"mqtt_client.publish(message)i = i + 1time.sleep(1)except KeyboardInterrupt:print("Exiting...")mqtt_client.stop()

三、查看所有的mqtt topic

认证需要API秘钥

image-20240412171132570

API接口:

http://192.168.101.130:18083/api/v5/topics

代码:

import urllib.request
import json
import base64username = 'ee26b0dd4af7e749'
password = 'ALsVuG69AjvFVVU1DoUKC3w0CeP1Nuajrd9CyI3brlCyN'# url = 'http://192.168.101.130:18083/api/v5/nodes'
url = "http://192.168.101.130:18083/api/v5/topics"req = urllib.request.Request(url)
req.add_header('Content-Type', 'application/json')auth_header = "Basic " + base64.b64encode((username + ":" + password).encode()).decode()
req.add_header('Authorization', auth_header)with urllib.request.urlopen(req) as response:data = json.loads(response.read().decode())print(data)

输出结果:

{'data': [{'node': 'emqx@127.0.0.1', 'topic': 't/1'}, {'node': 'emqx@127.0.0.1', 'topic': '$share/c/t/1'}, {'node': 'emqx@127.0.0.1', 'topic': '$share/a/t/1'}, {'node': 'emqx@127.0.0.1', 'topic': '$share/b/t/1'}], 'meta': {'count': 4, 'limit': 100, 'page': 1, 'hasnext': False}}

四、EMQX 到 Tdengine 数据导入性能测试报告**

测试环境:

  • EMQX 版本:5.5.0 (单体)

  • Tdengine 版本:3.2.2.0 (单体)

  • 操作系统:centos7

  • CPU:4核

    image-20240311162549500

  • 内存:3G

测试步骤:

  1. 在 EMQX 中配置数据源或者编写程序连接TDengine,确保实时数据可以被发送到指定的 Tdengine 数据库中。
  2. 在 Tdengine 数据库中创建相应的表结构,确保能够正确接收和存储实时数据。
  3. 启动数据导入功能,让 EMQX 将实时数据发送到 Tdengine 数据库中。
  4. 使用监控工具监视系统的 CPU 利用率、内存使用情况以及数据导入速率。

测试一:EMQX直接存储到TDengine

使用EMQX订阅EMQX中的Topic,存入到TDengine中

场景一:单线程发布消息

  • 一个客户端发送数据
  • 每10毫秒发送一次
  • 测试发送一万条数据
  • 用时0:01:47.116768
测试结果
  • 数据导入TDengine速率:

    平均每秒处理84.5条数据。

    最大处理95.3条数据

    最小处理55.3条数据

    image-20240314132250870

  • EMQX CPU利用率、 内存使用情况:

    CPU利用率最高26.3%,最低13.2%

    内存使用平均1.66GB

    image-20240314132401998

  • Tdengine CPU利用率、 内存使用情况:

    CPU利用率最高7.67%,最低1.83%

    内存平均668MB

    image-20240314132510169

  • 机器信息:

    image-20240314132550134

场景二:多线程发布消息

  • 五个客户端发送数据
  • 每10毫秒发送一次
  • 测试发送五万条数据
  • 用时0:01:49.660773
测试结果
  • 数据导入TDengine速率:

    平均每秒处理 360条数据。

    最大处理460条数据

    最小处理68.5条数据

    image-20240314135636248

    image-20240314135649631

  • EMQX CPU利用率、 内存使用情况:

    cpu最高百分之四十,最低百分之十

    内存平均1.32G

    image-20240314135455099

  • Tdengine CPU利用率、 内存使用情况:

    cpu最高百分之十二

    内存平均334M

    image-20240314135508596

  • 机器信息:

    image-20240314135749600

测试二:Python订阅消息存储到TDengine

通过Python程序订阅EMQX中的Topic,存入到TDengine中

场景一:单线程发布消息

  • 一个客户端发送数据
  • 每10毫秒发送一次
  • 测试发送一万条数据
  • 用时0:01:48.202740
测试结果:
  • 数据导入TDengine速率:

    平均每秒处理 81.8条数据。

    最大处理87.0条数据

    最小处理77.9条数据

    image-20240315141556759

  • Tdengine CPU利用率、 内存使用情况:

    cpu最高利用率2.17%,平均1.8%

    内存平均M

    image-20240315132225193

  • 机器信息:

    image-20240315141355156

场景二:多线程发布消息

  • 五个客户端发送数据
  • 每50毫秒发送一次
  • 测试发送五万条数据
  • 用时0:01:47.998246
测试结果
  • 数据导入TDengine速率:

    平均每秒处理76条数据。

    最大处理83.1条数据

    最小处理63.9条数据

    image-20240315142549985

  • Tdengine CPU利用率、 内存使用情况:

    cpu最高利用率1.3%,平均1.14%

    内存平均643M

    image-20240315142706079

  • 机器信息:

    image-20240315142836804

测试三:Java订阅消息存储到TDengine

场景一:单线程发布消息

  • 一个客户端发送数据
  • 每10毫秒发送一次
  • 测试发送一万条数据
  • 用时0:01:46.728102
测试结果
  • 数据导入TDengine速率:

    平均每秒处理 79.9条数据。

    最大处理95.5条数据

    最小处理41.8条数据

    image-20240315145341616

  • Tdengine CPU利用率、 内存使用情况:

    cpu最高利用率3.50%,平均2.14%

    内存平均646M

    image-20240315145406259

  • 机器信息:

    image-20240315145429519

场景二:多线程发布消息

  • 五个客户端发送数据
  • 每10毫秒发送一次
  • 测试发送五万条数据
  • 用时0:01:46.728102
测试结果
  • 数据导入TDengine速率:

    平均每秒处理 83.6条数据。

    最大处理95.3条数据

    最小处理48.9条数据

    image-20240315151949104

  • Tdengine CPU利用率、 内存使用情况:

    cpu最高利用率3.05%,平均1.86%

    内存平均657M

    image-20240315152002293

  • 机器信息:

    image-20240315152028787

测试四:Go订阅消息存储到TDengine

场景一:单线程发布消息

  • 一个客户端发送数据
  • 每10毫秒发送一次
  • 测试发送一万条数据
  • 用时0:01:48.395355
测试结果
  • 数据导入TDengine速率:

    平均每秒处理 82.6条数据。

    最大处理95.0条数据

    最小处理50.2条数据

    image-20240318132045758

  • Tdengine CPU利用率、 内存使用情况:

    cpu最高利用率12%,平均4.35%

    内存平均399M

    image-20240318132102394

  • 机器信息:

    image-20240318132135438

场景二:多线程发布消息

  • 五个客户端发送数据
  • 每10毫秒发送一次
  • 测试发送五万条数据
  • 用时0:01:46.728102
测试结果
  • 数据导入TDengine速率:

    平均每秒处理 237条数据。

    最大处理239条数据

    最小处理235条数据

    image-20240318134322446

  • Tdengine CPU利用率、 内存使用情况:

    cpu最高利用率3.43%,平均2.62%

    内存平均423M

    image-20240318134342581

  • 机器信息:

    image-20240318134402890

结果:

emqx直接连接TDenginepython订阅消息存储到TDengineJava订阅消息存储到TDenginego订阅消息存储到TDengine
单线程发布消息平均每秒84.5条,最大95.3条平均每秒81.8条,最大87条平均每秒79.9条,最大95.5条平均每秒86.2条,最大95.0条
多线程发布消息平均每秒 360条,最大460条平均每秒76条,最大83.1条平均每秒83.6条,最大95.3条平均每秒237条,最大239条

结论:

存储到TDengine速率:emqx>go>java>python

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/90051.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/90051.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从分析到优化:Amazon Q CLI 助力 EKS 网络调用链剖析与运维实践

1. 引言 在 Amazon EKS&#xff08;Elastic Kubernetes Service&#xff09;环境中&#xff0c;理解从 ALB&#xff08;Application Load Balancer&#xff09;到 Pod 的完整网络调用链对运维人员至关重要。本文将展示如何利用 Amazon Q CLI 这一 AI 助手工具&#xff0c;通过…

Class10简洁实现

Class10简洁实现 import torch from torch import nn from d2l import torch as d2l# 输入为28*28&#xff0c;输出为10类&#xff0c;第1、2隐藏层256神经元 num_inputs, num_outputs, num_hiddens1, num_hiddens2 784, 10, 256, 256 # 第1个隐藏层丢弃率为0.2&#xff0c;第…

【多线程篇22】:ConcurrentHashMap的并发安全原理剖析

文章目录一、HashMap 的“不安全”&#xff1a;问题的根源1. 数据结构回顾 (JDK 1.8)2. 并发下的致命缺陷&#xff1a;put 操作二、ConcurrentHashMap 的安全之道 (JDK 1.8)1. 核心数据结构2. 安全的 put 操作&#xff1a;分场景精细化加锁3. 安全的 size() 计算&#xff1a;并…

【Java + Vue 实现图片上传后 导出图片及Excel 并压缩为zip压缩包】

系统环境&#xff1a; Java JDK&#xff1a;1.8.0_202 Node.js&#xff1a;v12.2.0 Npm&#xff1a;6.9.0 Java后端实现 Controller /*** xxxx-导出* param response 返回信息体* param files 上传的图片文件* param param1 参数1* param param2 参数2*/PostMapping("/ex…

安科瑞:能源微电网助力工业园区“绿色”发展

朱以真近日&#xff0c;厦门市工业和信息化局印发工业园区绿色智慧微电网建设&#xff0c;拟开展全市工业园区绿色智慧微电网试点通知&#xff0c;那么对于如何实现绿色园区的建设是今天的话题。对工业园区绿色智慧微电网建设需求&#xff0c;其核心价值体现在“源-网-荷-储-充…

VUE2 学习笔记3 v-on、事件修饰符、键盘事件

事件处理v-on用于事件交互。语法&#xff1a;v-on:要绑定的事件“事件触发时执行的函数” &#xff08;函数这里可以写括号&#xff0c;也可以不写&#xff0c;没有影响&#xff09;简写&#xff1a;:事件触发时要执行的函数&#xff0c;在Vue配置参数中&#xff0c;通过method…

变换域通讯系统CCSK的matlab仿真

CCSK&#xff08;Cyclic Code Shift Keying&#xff09;通信系统的MATLAB仿真。实现完整的CCSK调制、AWGN信道传输和解调过程&#xff0c;并计算了误码率&#xff08;BER&#xff09;。 % CCSK通信系统仿真 clear; clc; close all;% 参数设置 L 31; % m序列…

技术演进中的开发沉思-40 MFC系列:多线程协作

今天说说MFC的线程&#xff0c;当年用它实现中间件消息得心应手之时&#xff0c;可以实现一边实时接收数据&#xff0c;一边更新界面图表图文信息&#xff0c;顺滑得让人想吹声口哨。 MFC 多线程它像给程序装上了分身术&#xff0c;让原本只能 “单任务跑腿” 的代码&#xff0…

高速公路自动化安全监测主要内容

近年来&#xff0c;随着社会经济的快速发展&#xff0c;高速公路的通车里程不断增加&#xff0c;交通流量日益增大。与此同时&#xff0c;高速公路交通事故数量也呈现出一定的增长趋势。这些事故不仅造成了大量的人员伤亡和财产损失&#xff0c;还严重影响了社会的稳定和经济的…

完美解决 Ubuntu 中自定义启动器图标重复的问题(以 MATLAB 为例)

如果你在 Ubuntu 上为 MATLAB、PyCharm、Android Studio 或其他第三方应用创建了自定义启动器&#xff0c;并把它固定到了左侧的 Dock 栏&#xff0c;你很可能遇到过这个令人抓狂的场景&#xff1a; 当你满心欢喜地点击固定的图标启动程序后&#xff0c;Dock 栏上竟然“凭空”冒…

一文读懂深度模型优化器,掌握炼丹工具

深度模型优化器是训练神经网络的核心工具&#xff0c;其目标是高效地找到损失函数的最小值。从基础的随机梯度下降&#xff08;SGD&#xff09;到结合一阶动量修正与两阶段更新的Ranger&#xff0c;优化器的发展始终围绕着加速收敛、提升稳定性、适应参数差异和增强泛化能力四大…

关于虚拟机部署的k8s集群calico组件在开机恢复快照时的问题

在恢复虚拟机开机快照的时候总会出现各种的网络问题&#xff0c;连接不到自己的443端口&#xff0c;集群状态异常等等&#xff0c;绕一圈都发现不了问题&#xff0c;其实根本就没问题。虚拟机环境再现&#xff0c;恢复快照后的k8s环境&#xff0c;nfs莫名宕掉提示此报错&#x…

从感知到决策:虚拟仿真系统与视觉算法融合下的多路RTSP视频接入技术探究

1️⃣ 背景概述 随着国防信息化和智能化进程的不断加快&#xff0c;虚拟仿真系统 与 智能视觉算法平台 正逐步成为现代化装备研发、测试验证与战术训练的重要技术支撑。相比传统的静态建模或离线推演&#xff0c;新一代作战仿真与智能系统更强调 实时性、感知能力与动态交互&a…

Android app如果不适配Android 15会怎么样

深度思考完成 用时49秒 首先&#xff0c;用户的问题是&#xff1a;“Android app如果不适配Android 15会怎么样&#xff1f;”我需要基于搜索结果来回答。 让我回顾搜索结果&#xff1a; ‌1&#xff1a;标题是“Android 15应用适配指南”&#xff0c;内容强调了不适配Androi…

Vue-21-利用Vue3的axios+Python的flask实现前后端交互功能

文章目录 1 任务需求 2 前端 2.1 模板template 2.2 JS代码script 2.3 Calculate.vue(子组件) 2.4 App.vue(根组件) 3 后端 3.1 导入模块 3.2 创建应用实例 3.3 配置CORS 3.4 定义路由 3.5 处理请求 3.6 main.py 4 附录 4.1 CORS 4.1.1 全局启用CORS 4.1.2 限制允许的域名(更安…

动态规划之最长回文子串

题目&#xff1a;最长回文子串 给你一个字符串 s&#xff0c;找到 s 中最长的 回文 子串。 示例 1&#xff1a; 输入&#xff1a;s “babad” 输出&#xff1a;“bab” 解释&#xff1a;“aba” 同样是符合题意的答案。 示例 2&#xff1a; 输入&#xff1a;s “cbbd” 输…

Linux 编程中的错误处理机制详解 —— `errno` 全解析

文章目录Linux 编程中的错误处理机制详解 —— errno 全解析一、什么是 errno&#xff1f;❓为什么需要 errno&#xff1f;✅ 它在哪里定义&#xff1f;二、errno 的设置与读取规则⚠️ errno 不是总是有效&#xff01;❗使用 errno 的正确步骤&#xff1a;三、与 errno 配套使…

力扣-最长递增子序列

简单记录学习~给你一个整数数组 nums &#xff0c;找到其中最长严格递增子序列的长度。子序列 是由数组派生而来的序列&#xff0c;删除&#xff08;或不删除&#xff09;数组中的元素而不改变其余元素的顺序。例如&#xff0c;[3,6,2,7] 是数组 [0,3,1,6,2,2,7] 的子序列。示例…

公司内部网址怎么在外网打开?如何让外网访问内网的网站呢?

很多公司内部本地会部署有中小型的服务器&#xff0c;可以很好的方便用于一些办公业务系统&#xff0c;或测试开发需要。在数字化办公和生活场景中&#xff0c;除了公司内部局域网内访问公司系统外&#xff0c;经常会遇到需要让外网访问内网网站的情况。比如企业员工远程办公时…

有趣的css - 多选立体标签按钮

&#x1f36d; 大家好&#xff0c;我是 Just&#xff0c;这里是「设计师工作日常」&#xff0c;今天分享的是一个交互较完整的多选立体标签按钮。 最新文章通过公众号「设计师工作日常」发布。 目录整体效果核心代码html 代码css 部分代码完整代码如下html 页面css 样式页面渲…