Python + Pika RabbitMQ集群压测完整方案

在这里插入图片描述在这里插入图片描述

一、最近搭建了个rabbitmq集群 三个磁盘节点,上生产环境之前想做个压测,测试下稳定性,参考Deepseek做了如下测试方案
在这里插入图片描述
二、核心代码实现:

  1. 配置文件 (config.py)
import os
RABBITMQ_NODES = ['amqp://admin:123456@192.168.0.175:8101',
]QUEUE_CONFIG = {'queue_name': 'stress_test_queue','durable': True,'arguments': {'x-ha-policy': 'all',  # 镜像队列'x-queue-mode': 'lazy'  # 惰性队列}
}TEST_PARAMS = {'producer_count': 20,      # 生产者进程数'consumer_count': 30,      # 消费者进程数'message_size': 1024,      # 消息大小(字节) 1KB'total_messages': 1000000, # 总消息量'test_duration': 600       # 测试时长(秒) 10分钟
}
LOG_DIR = "logs"
CONSUMER_LOG_FILE = os.path.join(LOG_DIR, "consumer_stress_test.log")
PRODUCER_LOG_FILE = os.path.join(LOG_DIR, "producer_stress_test.log")
  1. 生产者实现 (producer.py)
import pika
import time
import random
import os
from config import RABBITMQ_NODES, QUEUE_CONFIG, TEST_PARAMS, PRODUCER_LOG_FILE# 确保日志目录存在
os.makedirs(os.path.dirname(PRODUCER_LOG_FILE), exist_ok=True)def log_producer(message):"""记录生产者日志"""print(message)with open(PRODUCER_LOG_FILE, 'a', encoding='utf-8') as f:f.write(message + '\n')def produce_messages():"""生产者函数"""# 随机选择集群节点node = random.choice(RABBITMQ_NODES)params = pika.URLParameters(node)try:connection = pika.BlockingConnection(params)channel = connection.channel()# 声明队列channel.queue_declare(queue=QUEUE_CONFIG['queue_name'],durable=QUEUE_CONFIG['durable'],arguments=QUEUE_CONFIG['arguments'])start_time = time.time()message_count = 0pid = os.getpid()# 生成测试消息message_body = os.urandom(TEST_PARAMS['message_size'])while (time.time() - start_time) < TEST_PARAMS['test_duration']:try:# 发送消息channel.basic_publish(exchange='',routing_key=QUEUE_CONFIG['queue_name'],body=message_body,properties=pika.BasicProperties(delivery_mode=2,  # 持久化消息timestamp=int(time.time() * 1000)  # 毫秒时间戳))message_count += 1# 每1000条打印一次if message_count % 1000 == 0:current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())log_producer(f"{current_time}, [Producer {pid}] 发送: {message_count}")except Exception as e:log_producer(f"生产者错误: {str(e)}")# 重连机制try:if connection and connection.is_open:connection.close()connection = pika.BlockingConnection(params)channel = connection.channel()except Exception as e:log_producer(f"重连失败: {str(e)}")time.sleep(1)connection.close()log_producer(f"[Producer {pid}] 发送完成,总计发送: {message_count} 条消息")return message_countexcept Exception as e:log_producer(f"生产者初始化错误: {str(e)}")return 0
if __name__ == "__main__":produce_messages()
  1. 消费者实现 (consumer.py)
import pika
import time
import random
import os
import threading
from config import RABBITMQ_NODES, QUEUE_CONFIG, TEST_PARAMS, CONSUMER_LOG_FILEos.makedirs(os.path.dirname(CONSUMER_LOG_FILE), exist_ok=True)consumed_count = 0
total_latency = 0
min_latency = float('inf')
max_latency = 0connections = {}
lock = threading.Lock()def log_consumer(message):"""记录消费者日志"""print(message)with open(CONSUMER_LOG_FILE, 'a', encoding='utf-8') as f:f.write(message + '\n')def get_channel():"""获取或创建连接和通道"""with lock:pid = os.getpid()if pid not in connections or not connections[pid].is_open:node = random.choice(RABBITMQ_NODES)params = pika.URLParameters(node)connections[pid] = pika.BlockingConnection(params)return connections[pid].channel()def on_message(channel, method, properties, body):global consumed_count, total_latency, min_latency, max_latencycurrent_ts = time.time() * 1000latency = current_ts - properties.timestampconsumed_count += 1total_latency += latencymin_latency = min(min_latency, latency)max_latency = max(max_latency, latency)if consumed_count % 1000 == 0:pid = os.getpid()avg_latency = total_latency / consumed_countcurrent_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())log_consumer(f"{current_time}, [Consumer {pid}] 消费: {consumed_count}, 平均延迟: {avg_latency:.2f}ms")# 确认消息channel.basic_ack(delivery_tag=method.delivery_tag)def start_consumer():"""启动消费者"""try:channel = get_channel()# QoS设置channel.basic_qos(prefetch_count=100)channel.queue_declare(queue=QUEUE_CONFIG['queue_name'],durable=QUEUE_CONFIG['durable'],arguments=QUEUE_CONFIG['arguments'])channel.basic_consume(queue=QUEUE_CONFIG['queue_name'],on_message_callback=on_message)log_consumer(f"消费者启动 PID: {os.getpid()}")channel.start_consuming()except Exception as e:log_consumer(f"消费者错误: {str(e)}")# 尝试重新连接time.sleep(5)start_consumer()
if __name__ == "__main__":start_consumer()```
  1. 压测控制器 (controller.py)
import multiprocessing
import time
import os
from producer import produce_messages
from consumer import start_consumer
from config import TEST_PARAMS, CONSUMER_LOG_FILE
from report_generator import ReportGenerator  # 确保导入修复后的报告生成器def run_producers():"""启动生产者进程"""producers = []for _ in range(TEST_PARAMS['producer_count']):p = multiprocessing.Process(target=produce_messages)p.start()producers.append(p)return producersdef run_consumers():"""启动消费者进程"""consumers = []for _ in range(TEST_PARAMS['consumer_count']):p = multiprocessing.Process(target=start_consumer)p.start()consumers.append(p)return consumersdef monitor():"""监控压测过程"""start_time = time.time()while time.time() - start_time < TEST_PARAMS['test_duration'] + 10:elapsed = time.time() - start_timeprint(f"[监控] 压测已运行: {int(elapsed)}秒 / {TEST_PARAMS['test_duration']}秒")time.sleep(5)def main():"""主控制函数"""print("=" * 60)print("RabbitMQ集群压测控制器")print("=" * 60)# 确保日志目录存在os.makedirs(os.path.dirname(CONSUMER_LOG_FILE), exist_ok=True)# 清空日志文件open(CONSUMER_LOG_FILE, 'w').close()print("启动消费者...")consumers = run_consumers()time.sleep(5)  # 等待消费者就绪print("启动生产者...")producers = run_producers()print("启动监控...")monitor()print("压测完成,终止进程...")for p in producers:if p.is_alive():p.terminate()for c in consumers:if c.is_alive():c.terminate()print("所有进程已终止")# 生成压测报告print("\n" + "=" * 60)print("生成压测报告...")generator = ReportGenerator(CONSUMER_LOG_FILE)report_path = generator.generate_report()if report_path:print(f"报告已生成: {report_path}")else:print("报告生成失败")
if __name__ == "__main__":main()

5、日志解析器 (report_generator.py)

import pandas as pd
import matplotlib# 设置为无头模式,避免GUI问题
matplotlib.use('Agg')  # 必须在导入pyplot之前设置
import matplotlib.pyplot as plt
from datetime import datetime
import re
import os
import numpy as np
import sys
import platform# 检查并导入兼容的 docx 模块
try:from docx import Documentfrom docx.shared import Inches, Ptfrom docx.enum.text import WD_ALIGN_PARAGRAPHfrom docx.enum.table import WD_TABLE_ALIGNMENTDOCX_SUPPORT = True
except ImportError as e:print(f"警告: 无法导入 python-docx 库,Word 报告功能将不可用。错误: {str(e)}")print("请运行: pip install python-docx --upgrade")DOCX_SUPPORT = Falseclass ReportGenerator:def __init__(self, log_file='consumer_stress_test.log'):# 获取桌面路径if platform.system() == 'Windows':self.desktop_path = os.path.join(os.environ['USERPROFILE'], 'Desktop')else:self.desktop_path = os.path.join(os.path.expanduser('~'), 'Desktop')self.log_file = log_fileself.report_docx = os.path.join(self.desktop_path, 'RabbitMQ集群压测分析报告.docx')self.report_image = os.path.join(self.desktop_path, 'RabbitMQ压测性能图表.png')# 设置中文字体支持plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Arial Unicode MS']  # 多个回退选项plt.rcParams['axes.unicode_minus'] = Falsedef parse_logs(self):"""解析日志文件,提取时间戳、消费数量和延迟数据"""print(f"正在解析日志文件: {self.log_file}")data = []line_count = 0parsed_count = 0if not os.path.exists(self.log_file):print(f"错误: 日志文件不存在 - {self.log_file}")return pd.DataFrame()# 尝试多种编码encodings = ['utf-8', 'gbk', 'latin-1', 'cp1252']for encoding in encodings:try:with open(self.log_file, 'r', encoding=encoding) as f:for line in f:line_count += 1# 尝试多种可能的日志格式if 'Consumed' in line or '消费' in line or '已处理' in line or '处理' in line:try:# 尝试解析时间戳 - 支持多种格式timestamp_match = re.search(r'(\d{4}[-/]\d{1,2}[-/]\d{1,2} \d{1,2}:\d{1,2}:\d{1,2})',line)if timestamp_match:timestamp_str = timestamp_match.group(1)try:timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S")except:timestamp = datetime.strptime(timestamp_str, "%Y/%m/%d %H:%M:%S")# 解析消息数量count = 0count_match = re.search(r'(Consumed|消费|已处理|处理):?\s*(\d+)', line,re.IGNORECASE)if count_match:count = int(count_match.group(2))else:# 尝试直接查找数字num_match = re.search(r'\b(\d{3,})\b', line)if num_match:count = int(num_match.group(1))# 解析延迟latency = 0latency_match = re.search(r'(avg|平均|延迟|latency)[=::]?\s*([\d.]+)(ms|毫秒)?',line, re.IGNORECASE)if latency_match:latency = float(latency_match.group(2))data.append({'timestamp': timestamp,'count': count,'latency': latency})parsed_count += 1except Exception as e:print(f"解析错误 (行 {line_count}): {line.strip()} - {str(e)}")if parsed_count > 0:print(f"使用 {encoding} 编码成功解析 {parsed_count} 行日志")breakelse:print(f"使用 {encoding} 编码未找到有效数据")except UnicodeDecodeError:print(f"尝试 {encoding} 编码失败,尝试下一种...")continueexcept Exception as e:print(f"解析日志时发生错误: {str(e)}")return pd.DataFrame(data)def generate_charts(self, df):"""生成分析图表并保存到桌面"""if df.empty:print("没有有效数据,无法生成图表")return dfprint("正在生成分析图表...")try:plt.figure(figsize=(16, 12))# 消息消费速率plt.subplot(2, 2, 1)df = df.sort_values('timestamp')  # 确保按时间排序# 计算时间差和消息数差df['time_diff'] = df['timestamp'].diff().dt.total_seconds().fillna(0)df['count_diff'] = df['count'].diff().fillna(0)# 计算消息消费速率(避免除以零)df['msg_rate'] = np.where(df['time_diff'] > 0,df['count_diff'] / df['time_diff'],0)# 处理无限值和NaN值df['msg_rate'] = df['msg_rate'].replace([np.inf, -np.inf], np.nan)df['msg_rate'] = df['msg_rate'].ffill().bfill().fillna(0)plt.plot(df['timestamp'], df['msg_rate'], 'b-')plt.title('消息消费速率 (条/秒)', fontsize=14)plt.xlabel('时间', fontsize=12)plt.ylabel('消息数/秒', fontsize=12)plt.grid(True)# 累计消费消息数plt.subplot(2, 2, 2)plt.plot(df['timestamp'], df['count'], 'g-')plt.title('累计消费消息数', fontsize=14)plt.xlabel('时间', fontsize=12)plt.ylabel('总消息数', fontsize=12)plt.grid(True)# 延迟分布plt.subplot(2, 2, 3)plt.hist(df['latency'], bins=50, color='orange', alpha=0.7)plt.title('延迟分布', fontsize=14)plt.xlabel('延迟 (毫秒)', fontsize=12)plt.ylabel('频率', fontsize=12)plt.grid(True)# 延迟随时间变化plt.subplot(2, 2, 4)plt.plot(df['timestamp'], df['latency'], 'r-')plt.title('延迟随时间变化', fontsize=14)plt.xlabel('时间', fontsize=12)plt.ylabel('延迟 (毫秒)', fontsize=12)plt.grid(True)plt.tight_layout()plt.savefig(self.report_image, dpi=300)print(f"分析图表已保存到: {self.report_image}")except Exception as e:print(f"生成图表时出错: {str(e)}")import tracebacktraceback.print_exc()return dfdef create_text_report(self, df):"""创建文本格式的报告"""report_file = os.path.join(self.desktop_path, 'RabbitMQ压测分析报告.txt')with open(report_file, 'w', encoding='utf-8') as f:f.write("=" * 60 + "\n")f.write("RabbitMQ集群压测分析报告\n")f.write("=" * 60 + "\n\n")f.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")f.write(f"日志文件: {os.path.abspath(self.log_file)}\n\n")if df.empty:f.write("没有有效数据,无法生成报告\n")return report_file# 计算统计信息start_time = df['timestamp'].min()end_time = df['timestamp'].max()duration = (end_time - start_time).total_seconds()total_messages = df['count'].max() - df['count'].min()throughput = total_messages / duration if duration > 0 else 0# 延迟统计if not df['latency'].empty:latency_min = df['latency'].min()latency_max = df['latency'].max()latency_avg = df['latency'].mean()latency_90 = df['latency'].quantile(0.9)else:latency_min = latency_max = latency_avg = latency_90 = 0# 总体统计f.write("一、压测总体统计\n")f.write("-" * 50 + "\n")f.write(f"测试开始时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n")f.write(f"测试结束时间: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n")f.write(f"测试持续时间: {duration:.2f} 秒\n")f.write(f"总消费消息数: {total_messages:,} 条\n")f.write(f"平均吞吐量: {throughput:.2f} 条/秒\n")f.write(f"最小延迟: {latency_min:.2f} 毫秒\n")f.write(f"平均延迟: {latency_avg:.2f} 毫秒\n")f.write(f"最大延迟: {latency_max:.2f} 毫秒\n")f.write(f"90%延迟: {latency_90:.2f} 毫秒\n\n")# 分析结论f.write("二、性能分析结论\n")f.write("-" * 50 + "\n")# 吞吐量评估if throughput == 0:f.write("吞吐量评估: 未检测到有效吞吐量数据\n")elif throughput > 5000:f.write("吞吐量评估: 优秀 (>5000条/秒)\n")elif throughput > 2000:f.write("吞吐量评估: 良好 (2000-5000条/秒)\n")else:f.write("吞吐量评估: 需优化 (<2000条/秒)\n")# 延迟评估if latency_avg == 0:f.write("延迟评估: 未检测到延迟数据\n")elif latency_avg < 50:f.write("延迟评估: 优秀 (<50ms)\n")elif latency_avg < 200:f.write("延迟评估: 良好 (50-200ms)\n")else:f.write("延迟评估: 需优化 (>200ms)\n")# 稳定性评估if latency_90 > 0 and latency_avg > 0:stability_ratio = latency_90 / latency_avgf.write(f"稳定性评估: 90%延迟是平均延迟的 {stability_ratio:.1f} 倍\n")if stability_ratio < 2:f.write("稳定性评估: 系统稳定性优秀\n")elif stability_ratio < 5:f.write("稳定性评估: 系统稳定性良好\n")else:f.write("稳定性评估: 系统稳定性较差\n")# 优化建议f.write("\n三、优化建议\n")f.write("-" * 50 + "\n")suggestions = ["1. 增加消费者数量以提高吞吐量","2. 提高预取值(prefetch count)优化消费速度","3. 检查网络延迟,确保RabbitMQ集群节点间通信正常","4. 使用消息批量处理减少网络开销","5. 优化消费者代码,减少处理时间"]for suggestion in suggestions:f.write(suggestion + "\n")# 图表信息if os.path.exists(self.report_image):f.write("\n四、性能图表\n")f.write("-" * 50 + "\n")f.write(f"性能图表已保存到: {self.report_image}\n")print(f"文本报告已保存到: {report_file}")return report_filedef generate_report(self):"""生成完整的压测报告"""print("=" * 60)print("RabbitMQ压测报告生成工具")print("=" * 60)# 解析日志df = self.parse_logs()if df.empty:print("没有有效数据,无法生成报告")report_file = os.path.join(self.desktop_path, 'RabbitMQ压测分析报告.txt')with open(report_file, 'w', encoding='utf-8') as f:f.write("没有从日志文件中提取到有效数据,请检查日志格式\n")f.write(f"日志文件: {os.path.abspath(self.log_file)}\n")print(f"错误报告已保存到: {report_file}")return report_file# 生成图表self.generate_charts(df)# 生成报告return self.create_text_report(df)if __name__ == "__main__":import syslog_file = sys.argv[1] if len(sys.argv) > 1 else 'consumer_stress_test.log'generator = ReportGenerator(log_file)report_path = generator.generate_report()if report_path:print(f"\n报告已生成: {report_path}")else:print("\n报告生成失败")

6、 监控脚本 (monitor.py)

import pika
import time
import json
from config import RABBITMQ_NODES, QUEUE_CONFIGdef monitor_queue():while True:try:connection = pika.BlockingConnection(pika.URLParameters(random.choice(RABBITMQ_NODES)))channel = connection.channel()# 获取队列状态queue = channel.queue_declare(queue=QUEUE_CONFIG['queue_name'],passive=True)messages_ready = queue.method.message_countprint(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 积压消息: {messages_ready}")connection.close()time.sleep(5)except Exception as e:print(f"监控错误: {str(e)}")time.sleep(10)if __name__ == "__main__":monitor_queue()

7、运行压测程序:

python controller.py

8、结果展示

RabbitMQ集群压测分析报告
生成时间: 2025-08-07 13:19:23
日志文件: C:\Users\Administrator\PycharmProjects\pythonProject13\rabbitmq\logs\consumer_stress_test.log
一、压测总体统计
--------------------------------------------------
测试开始时间: 2025-08-07 13:09:27
测试结束时间: 2025-08-07 13:19:21
测试持续时间: 594.00 秒
总消费消息数: 137,000 条
平均吞吐量: 230.64 条/秒
最小延迟: 137895.78 毫秒
平均延迟: 673436.15 毫秒
最大延迟: 4793312.15 毫秒
90%延迟: 1471724.25 毫秒二、性能分析结论
--------------------------------------------------
吞吐量评估: 需优化 (<2000条/秒)
延迟评估: 需优化 (>200ms)
稳定性评估: 90%延迟是平均延迟的 2.2 倍
稳定性评估: 系统稳定性良好三、优化建议
--------------------------------------------------
1. 增加消费者数量以提高吞吐量
2. 提高预取值(prefetch count)优化消费速度
3. 检查网络延迟,确保RabbitMQ集群节点间通信正常
4. 使用消息批量处理减少网络开销
5. 优化消费者代码,减少处理时间四、性能图表
--------------------------------------------------
性能图表已保存到: C:\Users\Administrator\Desktop\RabbitMQ压测性能图表.png

图表展示
在这里插入图片描述
这个方案有个缺陷就是测试完成后 还会有很多message堆积在那里,没有被消费掉,所以我做了一个消费脚本,内容如下:

import pika
import threading
import random
import time
from config import RABBITMQ_NODES, QUEUE_CONFIG# 连接池管理
connection_pool = []
lock = threading.Lock()def get_connection():"""获取或创建连接"""with lock:if connection_pool:return connection_pool.pop()else:node = random.choice(RABBITMQ_NODES)return pika.BlockingConnection(pika.URLParameters(node))def release_connection(connection):"""释放连接到连接池"""with lock:if connection.is_open:connection_pool.append(connection)def consume_fast():"""高效消费消息的函数"""try:# 从连接池获取连接connection = get_connection()channel = connection.channel()# 设置高预取值channel.basic_qos(prefetch_count=1000)# 定义消息处理回调def callback(ch, method, properties, body):# 这里可以添加实际的消息处理逻辑# 例如: process_message(body)# 立即确认消息(更安全)ch.basic_ack(delivery_tag=method.delivery_tag)# 开始消费channel.basic_consume(queue=QUEUE_CONFIG['queue_name'],on_message_callback=callback,auto_ack=False  # 手动确认)print(f"[{threading.current_thread().name}] 快速消费者启动...")channel.start_consuming()except Exception as e:print(f"消费者错误: {str(e)}")finally:# 释放连接回连接池if 'connection' in locals() and connection.is_open:release_connection(connection)if __name__ == "__main__":# 初始化连接池for _ in range(10):  # 创建10个初始连接node = random.choice(RABBITMQ_NODES)connection_pool.append(pika.BlockingConnection(pika.URLParameters(node)))print(f"已创建 {len(connection_pool)} 个初始连接")# 启动多个快速消费者线程threads = []for i in range(20):  # 启动20个消费者线程t = threading.Thread(target=consume_fast,name=f"FastConsumer-{i + 1}",daemon=True)t.start()threads.append(t)time.sleep(0.1)  # 避免同时启动所有线程print("所有消费者线程已启动,按 Ctrl+C 停止...")# 监控积压消息try:while True:# 检查队列积压try:connection = get_connection()channel = connection.channel()queue = channel.queue_declare(queue=QUEUE_CONFIG['queue_name'],passive=True)backlog = queue.method.message_countprint(f"当前积压消息: {backlog}")release_connection(connection)# 如果积压减少,可以考虑减少消费者if backlog < 1000:print("积压已减少,可以安全停止脚本")breakexcept Exception as e:print(f"监控错误: {str(e)}")time.sleep(5)  # 每5秒检查一次except KeyboardInterrupt:print("\n正在停止消费者...")# 清理print("关闭所有连接...")for conn in connection_pool:if conn.is_open:conn.close()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/92439.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/92439.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【第7话:相机模型3】自动驾驶IPM图像投影拼接技术详解及代码示例

IPM图像投影拼接技术详解 IPM&#xff08;逆透视映射&#xff09;图像投影拼接技术是一种在计算机视觉中广泛应用的图像处理方法&#xff0c;主要用于将多个透视视图的图像转换为鸟瞰视图并拼接成一个无缝的大场景图像。该技术特别适用于自动驾驶、机器人导航和监控系统等领域&…

【测试工程思考】测试自动化基础能力建设

1 回顾 传统软件研发体系下定义的软件测试是从用户视角设计的。测试是试图穷尽用户行为的工程&#xff0c;从测试用例&#xff08;use case&#xff09;的英文定义就可见一般。测试的逻辑资产就是用自然语言去描述用户的操作行为或路径。 但随着软件工程向分布式架构和敏捷交付…

进阶向:AI聊天机器人(NLP+DeepSeek API)

什么是AI聊天机器人? AI聊天机器人是一种通过自然语言处理(NLP)技术模拟人类对话的智能程序系统。其核心是建立在机器学习算法和大型语言模型基础上的对话引擎,能够理解用户的自然语言输入,分析语境和意图,并生成符合上下文的相关回复。 这类机器人系统通常包含以下几个…

一个C#的段子

猜猜按钮的结果是啥。 public partial class Form1 : Form { public Form1() { InitializeComponent(); } private void Form1_Load(object sender, EventArgs e) { } public static bool flag true; privat…

使用 gptqmodel 量化 Qwen3-Coder-30B-A3B-Instruct

代码部分 : quantize_qwen3_coder_30b_a3b_instruct_gptq.py import os########## 环境变量设置 ########## # 当前可用的 CUDA 编号 os.environ["CUDA_VISIBLE_DEVICES"] "1" # GPU 显存资源片段优化 os.environ["PYTORCH_CUDA_ALLOC_CONF"] …

基于python、django的疫苗接种管理系统

基于python、django的疫苗接种管理系统

Go语言实战案例:使用sync.Map构建线程安全map

在并发编程中&#xff0c;共享资源的访问是一个绕不开的问题。Go 中的 map 在并发读写时是不安全的&#xff0c;直接使用可能导致程序 panic。因此&#xff0c;在多协程同时访问 Map 的场景下&#xff0c;必须采取有效的同步措施。本篇将通过一个实战案例&#xff0c;介绍 Go 的…

关于vue2中对接海康摄像头以及直播流rtsp或rtmp,后台ffmpeg转码后通过ws实现

最近项目中需要对接摄像头监控&#xff0c;海康摄像头为rtsp流格式有一个软件VLC media player&#xff0c;可以在线进行rtsp或者rtmp流播放&#xff0c;可用来测试流地址是否可用功能实现思路为后台通过fmpeg把rtsp流进行转码&#xff0c;然后通过ws方式进行一帧一帧推送。&am…

Docker容器强制删除及文件系统修复完整指南

Docker容器强制删除及文件系统修复完整指南 故障现象与原因分析 ​故障表现​&#xff1a; ERROR: for c9ca40be974d_OpIsosMD_OB unable to remove filesystem unlinkat /data/docker/storage/containers/c9ca40be974d...: structure needs cleaning​根本原因​&#xff1a;…

Matplotlib 知识点总结

1. 基础绘图&#xff08;plot函数&#xff09;基本语法&#xff1a;plot([x], y, [fmt], [x2], y2, [fmt2], ..., **kwargs)功能特点&#xff1a;可绘制点、线和组合图形自动生成x轴&#xff08;0-N-1&#xff09;当x未指定时示例&#xff1a;绘制两点连线、多点不规则线等代码…

高可用微服务架构实战:Nacos集群+Nginx负载均衡,Spring Cloud无缝对接

"当你的注册中心挂了&#xff0c;整个微服务就变成了无头苍蝇。" 这是我在生产环境踩坑后最痛的领悟。今天&#xff0c;我将分享如何用Nacos集群Nginx搭建坚如磐石的注册中心&#xff0c;让你的微服务永不迷路&#xff01; 在 Windows 环境下配置 Nacos 集群&#x…

Spark大数据处理实战指南

Spark 简介 Apache Spark 是一个开源的分布式计算框架,专为大规模数据处理而设计。它通过内存计算和优化的执行引擎显著提升了数据处理速度,适用于批处理、实时流处理、机器学习和图计算等场景。 核心特性 高性能:利用内存计算(In-Memory Processing)减少磁盘 I/O,比传…

浏览器缓存机制全解析:强缓存与协商缓存

浏览器缓存是浏览器为提升页面加载速度、减少服务器压力和节省网络带宽&#xff0c;在本地存储资源&#xff08;如 HTML、CSS、JS、图片等&#xff09;的机制。其核心分为强缓存和协商缓存&#xff0c;并涉及多种 HTTP 头字段和存储位置。以下是详细解析&#xff1a;⚙️ 一、缓…

知识随记-----Qt 实用技巧:自定义倒计时按钮防止用户频繁点击

Qt 技巧&#xff1a;实现自定义倒计时按钮防止用户频繁点击注册 项目场景 在一个基于 Qt 开发的聊天应用中&#xff0c;用户注册时需要获取验证码。为防止用户频繁点击获取验证码按钮&#xff0c;需要实现一个倒计时功能&#xff0c;用户点击后按钮进入倒计时状态&#xff0c;倒…

Linux与Windows应急响应

本人首先进行了linux的应急响应&#xff0c;windows之后再进行 Linux与Windows应急响应初体验1 linux应急响应1.1 账户&#xff1a;1.1.1 使用cat /etc/passwd命令查看passwd文件2.1.2 使用cat /etc/shadow命令查找shadow文件&#xff0c;该文件为密码文件的存储项1.2 入侵排查…

计算机网络1-4:计算机网络的定义和分类

目录 计算机网络的定义 计算机网络的分类 计算机网络的定义 计算机网络的分类 按交换技术分类&#xff1a;电路交换网络、报文交换网络、分组交换网络 按使用者分类&#xff1a;公用网、专用网 按传输介质分类&#xff1a;有线网络、无线网络 按覆盖范围分类&#xff1a;…

在QT中动态添加/删除控件,伸缩因子该怎么处理

开发中遇到的问题[TOC](开发中遇到的问题)处理方式在我们的界面开发过程中&#xff0c;通常需要开发一些可以动态添加or删除控件的容器&#xff0c;类似Tab页一样&#xff0c;为了美观的话&#xff0c;我们通常使用伸缩因子将容器中的控件往一个方向挤&#xff0c;类似下面的控…

【设计模式精解】什么是代理模式?彻底理解静态代理和动态代理

目录 静态代理 动态代理 JDK动态代理 CGLIB代理 JDK动态代理和CGLIB代理的区别 总结 代理模式简单来说就是 我们使用代理对象来代替对真实对象(real object)的访问&#xff0c;这样就可以在不修改原目标对象的前提下&#xff0c;扩展目标对象的功能。 代理模式有静态代理…

MCU AI/ML - 弥合智能和嵌入式系统之间的差距

作者&#xff1a;芯科科技产品营销高级经理Gopinath Krishniah 人工智能&#xff08;AI&#xff09;和机器学习&#xff08;ML&#xff09;是使系统能够从数据中学习、进行推理并随着时间的推移提高性能的关键技术。这些技术通常用于大型数据中心和功能强大的GPU&#xff0c;但…

Redis中的sdshdr的len和alloc那块的知识点详解

文章目录核心比喻&#xff1a;一个可以伸缩的水瓶场景一&#xff1a;创建一个新字符串场景二&#xff1a;追加字符串&#xff08;触发“空间预分配”&#xff09;场景三&#xff1a;再次追加字符串&#xff08;利用空闲空间&#xff09;场景四&#xff1a;缩短字符串&#xff0…