文章总览
为什么做备份更新
为机器人控制器设计一套打包备份更新机制,为控制器的批量生产和产品与项目落地做准备。
当某个模块出现bug需要升级时,用户可以快速获取正确的bak包并导入到控制器中重启生效。
如果没有做好软件的备份更新机制,解决问题时,需要重新烧录整个系统、或者费时费力地从源代码开始找问题然后修改编译,期间系统完全瘫痪。
哪些包计划更新
1、机器人控制器内置的web IDE服务(功能:调整参数、标定、可视化配置、扫图和地图操作、任务下发和状态监控、脚本二次开发)。通常有go、python等后端和vue等前端。
2、导航算法(功能:定位建图算法、导航避障控制算法),格式为ros包
3、传感器驱动程序(功能:相机、雷达、IMU等模块),格式为ROS包
4、通讯层程序(功能:将ROS topic和service转为websocket,提供API接口服务,用于与第三方系统通信),格式为ROS包
5、控制模块(功能:接收上层控制指令,实现底层电机等运动控制),格式为ROS包
打包流程
构建机(开发机)和部署机使用同样的处理器型号,所以在开发机完成开发和编译后,可以得到install 目录下的编译结果(可执行文件、库、Python pycache、配置文件等)
1 首先在构建机上执行命令
colcon build --symlink-install --cmake-args -DCMAKE_BUILD_TYPE=Release
2 创建一个发布包目录,只复制我们需要的编译产物
mkdir -p /tmp/robot_update_pkg_v1.1/install
cp -r install/ /tmp/robot_update_pkg_v1.1/# 删除所有的 .bak 文件(旧的备份)
find /tmp/robot_update_pkg_v1.1/ -name "*.bak" -delete
# 删除所有的编译中间文件(如果在install目录里有的话)
find /tmp/robot_update_pkg_v1.1/ -name "*.o" -delete
find /tmp/robot_update_pkg_v1.1/ -name "*.cmake" -delete
find /tmp/robot_update_pkg_v1.1/ -name "Makefile" -delete
# 删除文档、测试等可能不需要的文件
rm -rf /tmp/robot_update_pkg_v1.1/install/**/test/
rm -rf /tmp/robot_update_pkg_v1.1/install/**/share/doc/
3 打包命令
cd /tmp
tar -czvf robot_update_pkg_v1.1.tar.gz robot_update_pkg_v1.1/
在web页面上传备份包并自动部署
系统架构
- 1 Web上传服务 (ide_web_service):运行在控制器上,提供一个网页界面和API接口,用于接收和保存用户上传的 .bak 更新包。
- 2 自动部署脚本 (auto_deploy.py):作为系统服务(如 systemd)在控制器启动时运行,或在收到Web服务的通知后运行。它负责检查、解压、验证并执行部署。
- 3 更新包结构:.bak 包实际上是一个 .tar.gz 压缩包,包含编译好的 install 目录和部署脚本。
robot_controller/
├── uploads/ # Web服务存放上传的包
│ ├── robot_update_v1.1.tar.gz.bak
│ └── robot_update_v1.2.tar.gz.bak
├── current_version/ # 当前运行的版本(install目录的软链接或拷贝)
│ └── ... (install目录的内容)
├── backups/ # 部署过程中备份的文件
│ └── ...
├── ide_web_service/ # 您的Web服务包
│ └── app/
│ ├── main.py # 这是我们将要修改的Flask应用
│ └── ...
└── auto_deploy.py # 自动部署脚本
第一部分:Web上传服务
这个服务提供上传界面和处理逻辑。
from flask import Flask, request, jsonify, render_template
import os
from werkzeug.utils import secure_filename
import logging
from datetime import datetimeapp = Flask(__name__)# 配置
app.config['UPLOAD_FOLDER'] = '/home/robot/uploads'
app.config['MAX_CONTENT_LENGTH'] = 200 * 1024 * 1024 # 200MB 限制
ALLOWED_EXTENSIONS = {'bak', 'gz'}# 确保上传目录存在
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = app.loggerdef allowed_file(filename):"""检查文件扩展名是否合法"""return '.' in filename and \filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS@app.route('/')
def index():"""显示上传页面"""return render_template('upload.html')@app.route('/api/upload', methods=['POST'])
def upload_file():"""API接口:处理文件上传"""if 'file' not in request.files:return jsonify({'error': 'No file part'}), 400file = request.files['file']if file.filename == '':return jsonify({'error': 'No selected file'}), 400if file and allowed_file(file.filename):# 生成安全的文件名,并加上时间戳original_filename = secure_filename(file.filename)timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")save_filename = f"{timestamp}_{original_filename}"save_path = os.path.join(app.config['UPLOAD_FOLDER'], save_filename)try:file.save(save_path)logger.info(f"File uploaded successfully: {save_filename}")# 触发自动部署(可选:可以改为由系统服务监听文件变化)# try:# subprocess.run(["python3", "/home/robot/auto_deploy.py", "--file", save_path], check=False, timeout=5)# except Exception as e:# logger.error(f"Failed to trigger auto-deploy: {e}")return jsonify({'message': 'File uploaded successfully!','filename': save_filename,'next_step': 'Please restart the controller to apply the update.'}), 200except Exception as e:logger.error(f"File save failed: {e}")return jsonify({'error': 'File save failed'}), 500else:return jsonify({'error': 'Invalid file type'}), 400@app.route('/api/list_uploads')
def list_uploads():"""API接口:列出所有已上传的更新包"""files = []for f in os.listdir(app.config['UPLOAD_FOLDER']):if f.endswith('.bak'):file_path = os.path.join(app.config['UPLOAD_FOLDER'], f)files.append({'name': f,'size': os.path.getsize(file_path),'mtime': os.path.getmtime(file_path)})# 按修改时间倒序排列files.sort(key=lambda x: x['mtime'], reverse=True)return jsonify(files)if __name__ == '__main__':app.run(host='0.0.0.0', port=5000, debug=False)
对应的HTML模板 (templates/upload.html):
<!DOCTYPE html>
<html>
<head><title>Robot Controller Update</title><style>body { font-family: Arial, sans-serif; margin: 40px; }.upload-form { margin: 20px 0; padding: 20px; border: 1px solid #ccc; }.progress { display: none; margin: 10px 0; }.message { margin: 10px 0; padding: 10px; border-radius: 4px; }.success { background: #d4edda; color: #155724; }.error { background: #f8d7da; color: #721c24; }</style>
</head>
<body><h1>Upload System Update Package</h1><div class="upload-form"><input type="file" id="fileInput" accept=".bak,.gz"><button onclick="uploadFile()">Upload Update Package</button><div id="progress" class="progress">Uploading... <progress id="progressBar" value="0" max="100"></progress></div><div id="message"></div></div><script>async function uploadFile() {const fileInput = document.getElementById('fileInput');const progressDiv = document.getElementById('progress');const progressBar = document.getElementById('progressBar');const messageDiv = document.getElementById('message');if (!fileInput.files[0]) {showMessage('Please select a file first.', 'error');return;}const formData = new FormData();formData.append('file', fileInput.files[0]);try {progressDiv.style.display = 'block';messageDiv.innerHTML = '';const response = await fetch('/api/upload', {method: 'POST',body: formData});const result = await response.json();if (response.ok) {showMessage(`Upload successful! ${result.message} ${result.next_step}`, 'success');} else {showMessage(`Upload failed: ${result.error}`, 'error');}} catch (error) {showMessage('Upload failed: ' + error.message, 'error');} finally {progressDiv.style.display = 'none';}}function showMessage(text, type) {const messageDiv = document.getElementById('message');messageDiv.innerHTML = text;messageDiv.className = `message ${type}`;}</script>
</body>
</html>
第二部分:自动部署脚本 (auto_deploy.py)
这个脚本会在系统启动时运行,检查并部署最新的更新包。
#!/usr/bin/env python3
"""
自动部署脚本:在系统启动时运行,查找并应用最新的更新包
"""
import os
import tarfile
import logging
import shutil
import subprocess
import glob
from datetime import datetime# 配置
UPLOAD_DIR = "/home/robot/uploads"
TARGET_INSTALL_DIR = "/home/robot/ros2_ws/install"
BACKUP_DIR = "/home/robot/backups"
LOG_FILE = "/var/log/auto_deploy.log"# 设置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler(LOG_FILE),logging.StreamHandler()]
)
logger = logging.getLogger(__name__)def find_latest_update_package():"""查找最新的更新包"""pattern = os.path.join(UPLOAD_DIR, "*.bak")update_files = glob.glob(pattern)if not update_files:logger.info("No update packages found.")return None# 按修改时间获取最新的文件latest_file = max(update_files, key=os.path.getmtime)logger.info(f"Found latest update package: {latest_file}")return latest_filedef backup_current_version():"""备份当前运行的版本"""timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")backup_path = os.path.join(BACKUP_DIR, f"backup_{timestamp}")try:os.makedirs(BACKUP_DIR, exist_ok=True)if os.path.exists(TARGET_INSTALL_DIR):shutil.copytree(TARGET_INSTALL_DIR, backup_path)logger.info(f"Backup created at: {backup_path}")return backup_pathelse:logger.warning("Target install directory does not exist, skipping backup.")return Noneexcept Exception as e:logger.error(f"Backup failed: {e}")return Nonedef deploy_update_package(package_path):"""部署更新包"""# 创建临时解压目录extract_dir = "/tmp/update_extract"if os.path.exists(extract_dir):shutil.rmtree(extract_dir)os.makedirs(extract_dir)try:# 解压更新包logger.info(f"Extracting package: {package_path}")with tarfile.open(package_path, 'r:gz') as tar:tar.extractall(path=extract_dir)# 检查解压后的内容extracted_install = os.path.join(extract_dir, "install")if not os.path.exists(extracted_install):logger.error("No 'install' directory found in the update package!")return False# 备份当前版本backup_path = backup_current_version()# 部署新版本:先清空目标目录,然后拷贝新文件if os.path.exists(TARGET_INSTALL_DIR):shutil.rmtree(TARGET_INSTALL_DIR)shutil.copytree(extracted_install, TARGET_INSTALL_DIR)logger.info(f"Update deployed successfully to: {TARGET_INSTALL_DIR}")# 可选:将已部署的包移动到已部署目录或删除deployed_dir = os.path.join(UPLOAD_DIR, "deployed")os.makedirs(deployed_dir, exist_ok=True)shutil.move(package_path, os.path.join(deployed_dir, os.path.basename(package_path)))return Trueexcept Exception as e:logger.error(f"Deployment failed: {e}")# 尝试回滚if backup_path and os.path.exists(backup_path):try:if os.path.exists(TARGET_INSTALL_DIR):shutil.rmtree(TARGET_INSTALL_DIR)shutil.copytree(backup_path, TARGET_INSTALL_DIR)logger.info("Rollback to backup completed due to deployment failure.")except Exception as rollback_error:logger.error(f"Rollback also failed: {rollback_error}")return Falsefinally:# 清理临时目录if os.path.exists(extract_dir):shutil.rmtree(extract_dir)def main():logger.info("=== Auto Deployment Script Started ===")# 查找最新更新包latest_package = find_latest_update_package()if not latest_package:logger.info("No updates to deploy.")return# 部署更新success = deploy_update_package(latest_package)if success:logger.info("Update deployed successfully! Please restart ROS nodes.")# 这里可以添加自动重启ROS节点的逻辑# try:# subprocess.run(["systemctl", "restart", "robot-core.service"], check=True)# except Exception as e:# logger.error(f"Failed to restart service: {e}")else:logger.error("Update deployment failed!")logger.info("=== Auto Deployment Script Finished ===")if __name__ == "__main__":main()
第三部分:系统服务配置
创建systemd服务,让自动部署脚本在启动时运行。
创建服务文件 /etc/systemd/system/auto-deploy.service:
[Unit]
Description=Robot Auto Deployment Service
After=network.target
Wants=network.target[Service]
Type=oneshot
RemainAfterExit=yes
ExecStart=/usr/bin/python3 /home/robot/auto_deploy.py
User=robot
Group=robot
WorkingDirectory=/home/robot[Install]
WantedBy=multi-user.target
启用服务:
sudo systemctl daemon-reload
sudo systemctl enable auto-deploy.service
完整工作流程
- 用户操作:在IDE的网页界面中上传 robot_update_v1.2.tar.gz.bak 文件。
- Web服务:接收文件,保存到 /home/robot/uploads/ 目录。
- 重启控制器:用户通过网页或物理方式重启控制器。
- 自动部署:
- 系统启动时,auto-deploy.service 运行 auto_deploy.py。
- 脚本查找最新的 .bak 包,解压并部署到 install 目录。
- 部署成功后,自动重启ROS节点(可选)。 - 状态验证:用户通过Web界面或ROS工具验证新版本是否正常运行。