1、实现功能
通过结合 Python 的 watchdog 库(类似 Linux 的 inotify 机制)和 rsync 命令,实现了文件系统变化的实时监控和增量同步。下面详细解释其工作原理和运行方式:
2、核心工作原理
2.1、文件监控
使用watchdog库监控源目录的文件变化(创建、修改、删除、移动)。
1)监听事件类型:modified、created、deleted、moved;
2)所有变更会被收集到pending_changes集合中,避免重复处理。
2.2、增量同步
通过rsync命令将变化的文件同步到远程服务器:
1)使用--delete参数确保目标目录与源目录完全一致。
2)通过--exclude参数支持忽略特定文件或目录(如临时文件、日志)。
2.3、事件合并
设置 1 秒的延迟(sync_delay),将短时间内的多次变更合并为一次同步操作,减少不必要的网络传输。
3、环境准备
Step1、python环境
# 1. 安装 rsync
sudo apt update -y
sudo apt install rsync -y
# 2. 验证安装
rsync --version
Step2、python环境
pip install watchdog # 用于文件监控
Step3、主机环境
SSH相关操作:
将本机公钥内容添加到被连接的目标主机 ~/.ssh/authorized_keys 文件中。其中,authorized_keys文件权限要可读写;
# SSH秘钥对生成
ssh-keygen -t rsa -b 4096 -C "your_name" # 生成ssh-key
# 如何实现ssh免密登录
方案1:
ssh-copy-id -i id_rsa_linux.pub name@ip # 将本机公钥内容就添加到服务器authorized_keys文件中了,name@ip为远程主机用户名、IP
方案2:
将本机公钥id_rsa.pub拷贝到远程目标主机
cat id_rsa.pub >> ~/.ssh/authorized_keys # 将公钥添加到authorized_keys
chmod -R 600 ~/.ssh/authorized_keys # 添加权限
# SSH秘钥连接测试是否成功
ssh name@ip
4、示例代码
#!/usr/bin/env python3
# coding: utf-8
"""
pip install watchdog # 用于文件监控
sudo apt install rsync -y
"""
import os
import time
import subprocess
import logging
import argparse
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileSystemMovedEvent, EVENT_TYPE_MODIFIED, EVENT_TYPE_CREATED, \
EVENT_TYPE_DELETED, EVENT_TYPE_MOVED
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("sync_monitor.log"),
logging.StreamHandler()
]
)
class RsyncHandler(FileSystemEventHandler):
def __init__(self, source_dir, target_host, target_dir, exclude_patterns=None, ssh_key=None):
self.source_dir = os.path.abspath(source_dir)
self.target_host = target_host
self.target_dir = target_dir
self.exclude_patterns = exclude_patterns or []
self.ssh_key = ssh_key
self.last_sync_time = time.time()
self.pending_changes = set()
self.sync_delay = 1 # 合并1秒内的变更
def enqueue_change(self, event):
"""将变更加入待处理队列"""
path = event.src_path
if isinstance(event, FileSystemMovedEvent):
path = event.dest_path
rel_path = os.path.relpath(path, self.source_dir)
self.pending_changes.add(rel_path)
# 安排延迟同步,合并短时间内的多次变更
if time.time() - self.last_sync_time > self.sync_delay:
self.perform_sync()
def on_modified(self, event):
if not event.is_directory:
logging.info(f"修改: {event.src_path}")
self.enqueue_change(event)
def on_created(self, event):
logging.info(f"创建: {event.src_path}")
self.enqueue_change(event)
def on_deleted(self, event):
logging.info(f"删除: {event.src_path}")
self.enqueue_change(event)
def on_moved(self, event):
logging.info(f"移动: {event.src_path} -> {event.dest_path}")
self.enqueue_change(event)
def build_rsync_command(self):
"""构建rsync命令"""
cmd = ["rsync", "-avz", "--delete"]
# 添加排除模式
for pattern in self.exclude_patterns:
cmd.extend(["--exclude", pattern])
# 添加SSH密钥选项
if self.ssh_key:
cmd.extend(["-e", f"ssh -i {self.ssh_key}"])
# 添加源目录和目标
cmd.extend([f"{self.source_dir}/", f"{self.target_host}:{self.target_dir}/"])
return cmd
def perform_sync(self):
"""执行rsync同步"""
if not self.pending_changes:
return
logging.info(f"同步变更: {', '.join(self.pending_changes)}")
self.pending_changes.clear()
try:
cmd = self.build_rsync_command()
logging.debug(f"执行命令: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
logging.info(f"同步成功: {result.stdout.strip()}")
self.last_sync_time = time.time()
except subprocess.CalledProcessError as e:
logging.error(f"同步失败: {e.stderr.strip()}")
except Exception as e:
logging.error(f"发生错误: {str(e)}")
def main():
parser = argparse.ArgumentParser(description="文件同步监控工具 (Inotify + Rsync)")
parser.add_argument("source", help="源目录路径")
parser.add_argument("target_host", help="目标主机 (user@host)")
parser.add_argument("target_dir", help="目标目录路径")
parser.add_argument("-e", "--exclude", action="append", help="排除模式 (可重复使用)")
parser.add_argument("-k", "--ssh-key", help="SSH私钥路径")
parser.add_argument("-i", "--initial-sync", action="store_true", help="启动时执行初始同步")
args = parser.parse_args()
# 验证源目录存在
source_dir = os.path.abspath(args.source)
if not os.path.isdir(source_dir):
logging.error(f"源目录不存在: {source_dir}")
return
logging.info(f"开始监控目录: {source_dir}")
logging.info(f"目标服务器: {args.target_host}:{args.target_dir}")
# 创建事件处理器
event_handler = RsyncHandler(
source_dir,
args.target_host,
args.target_dir,
args.exclude,
args.ssh_key
)
# 执行初始同步
if args.initial_sync:
logging.info("执行初始同步...")
event_handler.perform_sync()
# 创建观察者
observer = Observer()
observer.schedule(event_handler, path=source_dir, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
main()