1. 监控架构核心组件
1.1 日志集中管理
设计目标:聚合所有节点的运行日志,支持实时查询与异常分析。 实现方式:
日志采集:各节点通过
logging
模块将日志发送至中央存储(如Elasticsearch或Redis)。日志分类:区分任务日志(如URL爬取状态)、系统日志(如节点资源占用)和错误日志(如反爬拦截)。
滚动策略:按时间或文件大小分割日志,避免存储爆炸。
代码示例(Python + Elasticsearch):
# 节点日志发送模块
from elasticsearch import Elasticsearch
import logging
from logging.handlers import RotatingFileHandler
import time
es = Elasticsearch(hosts=["monitor-node:9200"])
logger = logging.getLogger("spider_node")
logger.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler("spider.log", maxBytes=1e6, backupCount=5)
logger.addHandler(file_handler)
# 自定义Handler将日志同步到Elasticsearch
class ElasticsearchHandler(logging.Handler):
def emit(self, record):
log_entry = {
"timestamp": record.created,
"message": record.msg,
"level": record.levelname,
"module": record.module,
"process": record.process
}
es.index(index="spider-logs", body=log_entry)
logger.addHandler(ElasticsearchHandler())
# 模拟日志生成
for _ in range(10):
logger.info("This is an info message")
logger.error("This is an error message")
time.sleep(1)
1.2 节点健康检测
心跳机制:从节点周期性向监控中心发送心跳包(如HTTP请求或Redis Key刷新)。 故障判定:若连续3个周期未收到心跳,标记节点为宕机并触发任务重新分配。
代码示例(Redis心跳检测):
# 从节点心跳发送
import redis
import time
import uuid
node_id = str(uuid.uuid4()) # 生成唯一节点ID
r = redis.Redis(host='master-node', port=6379, db=0)
while True:
r.setex(f"node_heartbeat:{node_id}", 60, "alive") # 60秒有效期
print(f"Node {node_id} sent heartbeat")
time.sleep(30) # 每30秒发送一次
# 监控中心检测脚本
def check_nodes():
alive_nodes = []
for node_id in r.keys("node_heartbeat:*"):
node_id = node_id.decode().split(":")[1]
if r.exists(f"node_heartbeat:{node_id}"):
alive_nodes.append(node_id)
else:
reassign_failed_tasks(node_id) # 重新分配该节点未完成任务
return alive_nodes
def reassign_failed_tasks(node_id):
failed_tasks = r.lrange(f"node_tasks:{node_id}", 0, -1)
if failed_tasks:
r.lpush("task_queue", *failed_tasks) # 重新加入全局队列
print(f"Reassigned tasks for node {node_id}")
# 模拟节点检测
check_nodes()
1.3 任务队列监控
队列状态:监控Redis或RabbitMQ中的任务队列长度、消费速率。 积压告警:当待处理任务数超过阈值时触发告警(如Slack通知)。
代码示例(RabbitMQ队列监控):
# 使用RabbitMQ API获取队列状态
import requests
import time
RABBITMQ_API_URL = "http://rabbitmq:15672/api/"
QUEUE_NAME = "task_queue"
def get_queue_status():
auth = ("admin", "password")
response = requests.get(f"{RABBITMQ_API_URL}/queues/%2F/{QUEUE_NAME}", auth=auth)
return response.json()
def monitor_queue():
while True:
status = get_queue_status()
messages = status.get("messages", 0)
if messages > 1000:
send_alert(f"警告:队列 {QUEUE_NAME} 积压任务数: {messages}")
time.sleep(10) # 每10秒检查一次
def send_alert(message):
# Slack通知示例
slack_webhook_url = "https://hooks.slack.com/services/..."
payload = {"text": message}
requests.post(slack_webhook_url, json=payload)
# 启动监控
monitor_queue()
1.4 性能指标采集
指标类型:CPU/内存占用、网络IO、任务吞吐量(如每秒处理URL数)。 工具链:Prometheus + Grafana实现指标采集与可视化。
代码示例(Prometheus Client):
# 节点暴露指标端点
from prometheus_client import start_http_server, Gauge
import psutil
import time
import random
# 启动HTTP服务,暴露/metrics端点
start_http_server(8000)
# 定义指标
cpu_usage = Gauge('spider_cpu_usage_percent', 'CPU使用率百分比')
memory_usage = Gauge('spider_memory_usage_mb', '内存使用量(MB)')
task_throughput = Gauge('spider_task_throughput_per_sec', '每秒处理任务数')
def collect_metrics():
while True:
# 采集CPU使用率
cpu_usage.set(psutil.cpu_percent())
# 采集内存使用量
memory_info = psutil.virtual_memory()
memory_usage.set(memory_info.used / (1024 * 1024)) # 转换为MB
# 模拟任务吞吐量采集
task_throughput.set(random.randint(10, 100))
time.sleep(5) # 每5秒采集一次
if __name__ == '__main__':
collect_metrics()
2. 监控架构整合设计
2.1 架构图
+------------------+ +------------------+
| 爬虫节点 | | 监控中心 |
| - 心跳发送 |<----->| - 心跳检测 |
| - 日志上报 | | - 日志存储(ES) |
| - 指标暴露 | | - Prometheus |
+------------------+ +------------------+
↓ ↑
+------------------+ +------------------+
| 消息队列 | | 可视化面板 |
| - 任务积压监控 |------>| - Grafana |
+------------------+ +------------------+
2.2 告警策略
分级告警:
紧急级:节点宕机、任务队列持续积压。
警告级:CPU持续超80%、反爬触发频率过高。 通知渠道:邮件、Slack、Webhook。
2.3 容错与恢复增强
自动故障转移:当节点宕机时,监控中心通过Redis的BLPOP命令重新分配未完成任务至其他节点。 数据一致性校验:使用MongoDB的副本集或Redis事务保证去重数据一致性。
代码示例(任务重新分配):
# Redis任务重新分配逻辑
def reassign_failed_tasks(node_id):
# 获取失败节点的任务
failed_tasks = r.lrange(f"node_tasks:{node_id}", 0, -1)
if failed_tasks:
# 将任务重新加入全局队列
r.lpush("task_queue", *failed_tasks)
print(f"Reassigned {len(failed_tasks)} tasks from node {node_id}")
代码示例(URL去重写入):
# URL去重写入(原子操作)
def add_url_to_visited(url):
with r.pipeline() as pipe:
while True:
try:
pipe.watch("visited_urls")
if not pipe.sismember("visited_urls", url):
pipe.multi()
pipe.sadd("visited_urls", url)
pipe.execute()
return True
else:
return False
except redis.WatchError:
continue
# 测试URL去重
urls_to_add = ["http://example.com/1", "http://example.com/2", "http://example.com/1"]
for url in urls_to_add:
if add_url_to_visited(url):
print(f"Added new URL: {url}")
else:
print(f"URL already exists: {url}")