为什么做备份更新
为机器人控制器设计一套打包备份更新机制,为控制器的批量生产和产品与项目落地做准备。
当某个模块出现bug需要升级时,用户可以快速获取正确的bak包并导入到控制器中重启生效。
如果没有做好软件的备份更新机制,解决问题时,需要重新烧录整个系统、或者费时费力地从源代码开始找问题然后修改编译,期间系统完全瘫痪。
哪些包计划更新
1、机器人控制器内置的web IDE服务(功能:调整参数、标定、可视化配置、扫图和地图操作、任务下发和状态监控、脚本二次开发)。通常有go、python等后端和vue等前端。
2、导航算法(功能:定位建图算法、导航避障控制算法),格式为ros包
3、传感器驱动程序(功能:相机、雷达、IMU等模块),格式为ROS包
4、通讯层程序(功能:将ROS topic和service转为websocket,提供API接口服务,用于与第三方系统通信),格式为ROS包
5、控制模块(功能:接收上层控制指令,实现底层电机等运动控制),格式为ROS包
打包流程
构建机(开发机)和部署机使用同样的处理器型号,所以在开发机完成开发和编译后,可以得到install 目录下的编译结果(可执行文件、库、Python pycache、配置文件等)
1 首先在构建机上执行命令
colcon build --symlink-install --cmake-args -DCMAKE_BUILD_TYPE=Release
2 创建一个发布包目录,只复制我们需要的编译产物
mkdir -p /tmp/robot_update_pkg_v1.1/install
cp -r install/ /tmp/robot_update_pkg_v1.1/
# 删除所有的 .bak 文件(旧的备份)
find /tmp/robot_update_pkg_v1.1/ -name "*.bak" -delete
# 删除所有的编译中间文件(如果在install目录里有的话)
find /tmp/robot_update_pkg_v1.1/ -name "*.o" -delete
find /tmp/robot_update_pkg_v1.1/ -name "*.cmake" -delete
find /tmp/robot_update_pkg_v1.1/ -name "Makefile" -delete
# 删除文档、测试等可能不需要的文件
rm -rf /tmp/robot_update_pkg_v1.1/install/**/test/
rm -rf /tmp/robot_update_pkg_v1.1/install/**/share/doc/
3 打包命令
cd /tmp
tar -czvf robot_update_pkg_v1.1.tar.gz robot_update_pkg_v1.1/
在web页面上传备份包并自动部署
系统架构
- 1 Web上传服务 (ide_web_service):运行在控制器上,提供一个网页界面和API接口,用于接收和保存用户上传的 .bak 更新包。
- 2 自动部署脚本 (auto_deploy.py):作为系统服务(如 systemd)在控制器启动时运行,或在收到Web服务的通知后运行。它负责检查、解压、验证并执行部署。
- 3 更新包结构:.bak 包实际上是一个 .tar.gz 压缩包,包含编译好的 install 目录和部署脚本。
robot_controller/
├── uploads/ # Web服务存放上传的包
│ ├── robot_update_v1.1.tar.gz.bak
│ └── robot_update_v1.2.tar.gz.bak
├── current_version/ # 当前运行的版本(install目录的软链接或拷贝)
│ └── ... (install目录的内容)
├── backups/ # 部署过程中备份的文件
│ └── ...
├── ide_web_service/ # 您的Web服务包
│ └── app/
│ ├── main.py # 这是我们将要修改的Flask应用
│ └── ...
└── auto_deploy.py # 自动部署脚本
第一部分:Web上传服务
这个服务提供上传界面和处理逻辑。
from flask import Flask, request, jsonify, render_template
import os
from werkzeug.utils import secure_filename
import logging
from datetime import datetime
app = Flask(__name__)
# 配置
app.config['UPLOAD_FOLDER'] = '/home/robot/uploads'
app.config['MAX_CONTENT_LENGTH'] = 200 * 1024 * 1024 # 200MB 限制
ALLOWED_EXTENSIONS = {'bak', 'gz'}
# 确保上传目录存在
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
# 设置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = app.logger
def allowed_file(filename):
"""检查文件扩展名是否合法"""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/')
def index():
"""显示上传页面"""
return render_template('upload.html')
@app.route('/api/upload', methods=['POST'])
def upload_file():
"""API接口:处理文件上传"""
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
if file and allowed_file(file.filename):
# 生成安全的文件名,并加上时间戳
original_filename = secure_filename(file.filename)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_filename = f"{timestamp}_{original_filename}"
save_path = os.path.join(app.config['UPLOAD_FOLDER'], save_filename)
try:
file.save(save_path)
logger.info(f"File uploaded successfully: {save_filename}")
# 触发自动部署(可选:可以改为由系统服务监听文件变化)
# try:
# subprocess.run(["python3", "/home/robot/auto_deploy.py", "--file", save_path], check=False, timeout=5)
# except Exception as e:
# logger.error(f"Failed to trigger auto-deploy: {e}")
return jsonify({
'message': 'File uploaded successfully!',
'filename': save_filename,
'next_step': 'Please restart the controller to apply the update.'
}), 200
except Exception as e:
logger.error(f"File save failed: {e}")
return jsonify({'error': 'File save failed'}), 500
else:
return jsonify({'error': 'Invalid file type'}), 400
@app.route('/api/list_uploads')
def list_uploads():
"""API接口:列出所有已上传的更新包"""
files = []
for f in os.listdir(app.config['UPLOAD_FOLDER']):
if f.endswith('.bak'):
file_path = os.path.join(app.config['UPLOAD_FOLDER'], f)
files.append({
'name': f,
'size': os.path.getsize(file_path),
'mtime': os.path.getmtime(file_path)
})
# 按修改时间倒序排列
files.sort(key=lambda x: x['mtime'], reverse=True)
return jsonify(files)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=False)
对应的HTML模板 (templates/upload.html):
<!DOCTYPE html>
<html>
<head>
<title>Robot Controller Update</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.upload-form { margin: 20px 0; padding: 20px; border: 1px solid #ccc; }
.progress { display: none; margin: 10px 0; }
.message { margin: 10px 0; padding: 10px; border-radius: 4px; }
.success { background: #d4edda; color: #155724; }
.error { background: #f8d7da; color: #721c24; }
</style>
</head>
<body>
<h1>Upload System Update Package</h1>
<div class="upload-form">
<input type="file" id="fileInput" accept=".bak,.gz">
<button onclick="uploadFile()">Upload Update Package</button>
<div id="progress" class="progress">Uploading... <progress id="progressBar" value="0" max="100"></progress></div>
<div id="message"></div>
</div>
<script>
async function uploadFile() {
const fileInput = document.getElementById('fileInput');
const progressDiv = document.getElementById('progress');
const progressBar = document.getElementById('progressBar');
const messageDiv = document.getElementById('message');
if (!fileInput.files[0]) {
showMessage('Please select a file first.', 'error');
return;
}
const formData = new FormData();
formData.append('file', fileInput.files[0]);
try {
progressDiv.style.display = 'block';
messageDiv.innerHTML = '';
const response = await fetch('/api/upload', {
method: 'POST',
body: formData
});
const result = await response.json();
if (response.ok) {
showMessage(`Upload successful! ${result.message} ${result.next_step}`, 'success');
} else {
showMessage(`Upload failed: ${result.error}`, 'error');
}
} catch (error) {
showMessage('Upload failed: ' + error.message, 'error');
} finally {
progressDiv.style.display = 'none';
}
}
function showMessage(text, type) {
const messageDiv = document.getElementById('message');
messageDiv.innerHTML = text;
messageDiv.className = `message ${type}`;
}
</script>
</body>
</html>
第二部分:自动部署脚本 (auto_deploy.py)
这个脚本会在系统启动时运行,检查并部署最新的更新包。
#!/usr/bin/env python3
"""
自动部署脚本:在系统启动时运行,查找并应用最新的更新包
"""
import os
import tarfile
import logging
import shutil
import subprocess
import glob
from datetime import datetime
# 配置
UPLOAD_DIR = "/home/robot/uploads"
TARGET_INSTALL_DIR = "/home/robot/ros2_ws/install"
BACKUP_DIR = "/home/robot/backups"
LOG_FILE = "/var/log/auto_deploy.log"
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def find_latest_update_package():
"""查找最新的更新包"""
pattern = os.path.join(UPLOAD_DIR, "*.bak")
update_files = glob.glob(pattern)
if not update_files:
logger.info("No update packages found.")
return None
# 按修改时间获取最新的文件
latest_file = max(update_files, key=os.path.getmtime)
logger.info(f"Found latest update package: {latest_file}")
return latest_file
def backup_current_version():
"""备份当前运行的版本"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = os.path.join(BACKUP_DIR, f"backup_{timestamp}")
try:
os.makedirs(BACKUP_DIR, exist_ok=True)
if os.path.exists(TARGET_INSTALL_DIR):
shutil.copytree(TARGET_INSTALL_DIR, backup_path)
logger.info(f"Backup created at: {backup_path}")
return backup_path
else:
logger.warning("Target install directory does not exist, skipping backup.")
return None
except Exception as e:
logger.error(f"Backup failed: {e}")
return None
def deploy_update_package(package_path):
"""部署更新包"""
# 创建临时解压目录
extract_dir = "/tmp/update_extract"
if os.path.exists(extract_dir):
shutil.rmtree(extract_dir)
os.makedirs(extract_dir)
try:
# 解压更新包
logger.info(f"Extracting package: {package_path}")
with tarfile.open(package_path, 'r:gz') as tar:
tar.extractall(path=extract_dir)
# 检查解压后的内容
extracted_install = os.path.join(extract_dir, "install")
if not os.path.exists(extracted_install):
logger.error("No 'install' directory found in the update package!")
return False
# 备份当前版本
backup_path = backup_current_version()
# 部署新版本:先清空目标目录,然后拷贝新文件
if os.path.exists(TARGET_INSTALL_DIR):
shutil.rmtree(TARGET_INSTALL_DIR)
shutil.copytree(extracted_install, TARGET_INSTALL_DIR)
logger.info(f"Update deployed successfully to: {TARGET_INSTALL_DIR}")
# 可选:将已部署的包移动到已部署目录或删除
deployed_dir = os.path.join(UPLOAD_DIR, "deployed")
os.makedirs(deployed_dir, exist_ok=True)
shutil.move(package_path, os.path.join(deployed_dir, os.path.basename(package_path)))
return True
except Exception as e:
logger.error(f"Deployment failed: {e}")
# 尝试回滚
if backup_path and os.path.exists(backup_path):
try:
if os.path.exists(TARGET_INSTALL_DIR):
shutil.rmtree(TARGET_INSTALL_DIR)
shutil.copytree(backup_path, TARGET_INSTALL_DIR)
logger.info("Rollback to backup completed due to deployment failure.")
except Exception as rollback_error:
logger.error(f"Rollback also failed: {rollback_error}")
return False
finally:
# 清理临时目录
if os.path.exists(extract_dir):
shutil.rmtree(extract_dir)
def main():
logger.info("=== Auto Deployment Script Started ===")
# 查找最新更新包
latest_package = find_latest_update_package()
if not latest_package:
logger.info("No updates to deploy.")
return
# 部署更新
success = deploy_update_package(latest_package)
if success:
logger.info("Update deployed successfully! Please restart ROS nodes.")
# 这里可以添加自动重启ROS节点的逻辑
# try:
# subprocess.run(["systemctl", "restart", "robot-core.service"], check=True)
# except Exception as e:
# logger.error(f"Failed to restart service: {e}")
else:
logger.error("Update deployment failed!")
logger.info("=== Auto Deployment Script Finished ===")
if __name__ == "__main__":
main()
第三部分:系统服务配置
创建systemd服务,让自动部署脚本在启动时运行。
创建服务文件 /etc/systemd/system/auto-deploy.service:
[Unit]
Description=Robot Auto Deployment Service
After=network.target
Wants=network.target
[Service]
Type=oneshot
RemainAfterExit=yes
ExecStart=/usr/bin/python3 /home/robot/auto_deploy.py
User=robot
Group=robot
WorkingDirectory=/home/robot
[Install]
WantedBy=multi-user.target
启用服务:
sudo systemctl daemon-reload
sudo systemctl enable auto-deploy.service
完整工作流程
- 用户操作:在IDE的网页界面中上传 robot_update_v1.2.tar.gz.bak 文件。
- Web服务:接收文件,保存到 /home/robot/uploads/ 目录。
- 重启控制器:用户通过网页或物理方式重启控制器。
- 自动部署:
- 系统启动时,auto-deploy.service 运行 auto_deploy.py。
- 脚本查找最新的 .bak 包,解压并部署到 install 目录。
- 部署成功后,自动重启ROS节点(可选)。 - 状态验证:用户通过Web界面或ROS工具验证新版本是否正常运行。