工程建设行业AI质量风险预测与决策支持系统
系统概述
本系统是一个面向工程建设行业的AI质量风险预测与决策支持系统,旨在通过先进的数据分析和机器学习技术,提前识别工程质量潜在风险,并提供可落地的优化方案。系统具备跨平台同步功能,支持多终端数据访问与协作。
系统架构设计
整体架构
系统采用微服务架构,分为以下核心模块:
- 数据采集与整合模块
- 质量风险预测引擎
- 决策建议生成模块
- 质量数据标准化管理
- 可视化分析平台
- 跨平台同步服务
技术栈
- 后端框架:FastAPI + PostgreSQL
- 机器学习:Scikit-learn, TensorFlow, PyTorch
- 数据处理:Pandas, NumPy
- 可视化:Matplotlib, Seaborn, Plotly
- 跨平台同步:WebSocket + RESTful API
- 部署:Docker + Kubernetes
核心模块实现
1. 数据采集与整合模块
import pandas as pd
import numpy as np
from datetime import datetime
import requests
import json
from typing import Dict, List, Any
import asyncio
import aiohttp
class DataCollector:
def __init__(self, db_config: Dict[str, Any]):
self.db_config = db_config
self.sensor_data_queue = asyncio.Queue()
self.manual_data_queue = asyncio.Queue()
self.document_data_queue = asyncio.Queue()
async def collect_sensor_data(self, sensor_endpoints: List[str]):
"""采集传感器数据"""
async with aiohttp.ClientSession() as session:
tasks = []
for endpoint in sensor_endpoints:
task = asyncio.create_task(self._fetch_sensor_data(session, endpoint))
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if not isinstance(result, Exception):
await self.sensor_data_queue.put(result)
async def _fetch_sensor_data(self, session: aiohttp.ClientSession, endpoint: str):
"""获取单个传感器数据"""
try:
async with session.get(endpoint, timeout=10) as response:
if response.status == 200:
data = await response.json()
return {
'sensor_id': data.get('sensor_id'),
'timestamp': datetime.now(),
'value': data.get('value'),
'type': data.get('type'),
'unit': data.get('unit')
}
except Exception as e:
print(f"Error fetching sensor data from {endpoint}: {e}")
return None
async def collect_manual_data(self, form_data: Dict[str, Any]):
"""采集人工录入数据"""
# 验证数据格式
validated_data = self._validate_manual_data(form_data)
if validated_data:
await self.manual_data_queue.put(validated_data)
def _validate_manual_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""验证人工录入数据"""
required_fields = ['project_id', 'inspector_id', 'check_item', 'result', 'timestamp']
for field in required_fields:
if field not in data:
raise ValueError(f"Missing required field: {field}")
# 添加数据质量标记
data['data_quality'] = self._assess_data_quality(data)
return data
def _assess_data_quality(self, data: Dict[str, Any]) -> str:
"""评估数据质量"""
# 实现数据质量评估逻辑
score = 0
if 'photos' in data and len(data['photos']) > 0:
score += 20
if 'comments' in data and len(data['comments']) > 10:
score += 20
if 'measurements' in data and len(data['measurements']) > 0:
score += 30
if 'signature' in data:
score += 30
if score >= 80:
return "excellent"
elif score >= 60:
return "good"
elif score >= 40:
return "fair"
else:
return "poor"
async def integrate_data(self):
"""整合多源数据"""
while True:
# 从各个队列获取数据
sensor_data = await self._get_sensor_data_batch()
manual_data = await self._get_manual_data_batch()
document_data = await self._get_document_data_batch()
# 数据融合
integrated_data = await self._fuse_data(sensor_data, manual_data, document_data)
# 存储整合后的数据
await self._store_integrated_data(integrated_data)
# 每隔一定时间执行一次整合
await asyncio.sleep(60) # 每分钟整合一次
async def _fuse_data(self, sensor_data: List[Dict], manual_data: List[Dict],
document_data: List[Dict]) -> List[Dict]:
"""融合多源数据"""
fused_data = []
# 时间对齐和空间对齐
# 实现具体的数据融合算法
for manual_item in manual_data:
fused_item = manual_item.copy()
# 查找相近时间和位置的传感器数据
related_sensor_data = self._find_related_sensor_data(
manual_item, sensor_data
)
if related_sensor_data:
fused_item['sensor_readings'] = related_sensor_data
# 关联文档数据
related_documents = self._find_related_documents(
manual_item, document_data
)
if related_documents:
fused_item['related_documents'] = related_documents
fused_data.append(fused_item)
return fused_data
def _find_related_sensor_data(self, manual_item: Dict, sensor_data: List[Dict]) -> List[Dict]:
"""查找相关的传感器数据"""
# 实现基于时间和空间的相关性查找
related_data = []
manual_time = manual_item['timestamp']
manual_location = manual_item.get('location')
for sensor_item in sensor_data:
sensor_time = sensor_item['timestamp']
time_diff = abs((manual_time - sensor_time).total_seconds())
# 时间窗口内且位置相近的数据
if time_diff < 300: # 5分钟时间窗口
if manual_location and 'location' in sensor_item:
# 计算空间距离(简化处理)
distance = self._calculate_distance(manual_location, sensor_item['location'])
if distance < 50: # 50米范围内
related_data.append(sensor_item)
else:
related_data.append(sensor_item)
return related_data
def _calculate_distance(self, loc1: Dict[str, float], loc2: Dict[str, float]) -> float:
"""计算两个位置之间的距离(简化版)"""
# 实际应用中应使用Haversine公式等更精确的方法
dx = loc1['x'] - loc2['x']
dy = loc1['y'] - loc2['y']
return (dx**2 + dy**2)**0.5
2. 质量风险预测引擎
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier, IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import joblib
import pickle
from datetime import datetime, timedelta
from typing import Dict, List, Any, Tuple
import logging
class QualityRiskPredictor:
def __init__(self, model_path: str = None):
self.model = None
self.scaler = StandardScaler()
self.feature_columns = []
self.model_path = model_path or "models/quality_risk_predictor"
self.logger = logging.getLogger(__name__)
# 加载已有模型或初始化新模型
self._load_or_init_model()
def _load_or_init_model(self):
"""加载或初始化预测模型"""
try:
self.model = joblib.load(f"{self.model_path}.pkl")
with open(f"{self.model_path}_features.pkl", 'rb') as f:
self.feature_columns = pickle.load(f)
self.logger.info("Loaded existing model")
except FileNotFoundError:
self.logger.info("Initializing new model")
self.model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
random_state=42,
class_weight='balanced'
)
def preprocess_data(self, data: pd.DataFrame) -> pd.DataFrame:
"""预处理质量数据"""
df = data.copy()
# 处理缺失值
df = self._handle_missing_values(df)
# 特征工程
df = self._feature_engineering(df)
# 选择特征列
if not self.feature_columns:
self.feature_columns = self._select_features(df)
# 确保所有需要的特征都存在
for col in self.feature_columns:
if col not in df.columns:
df[col] = 0 # 添加缺失的特征列并填充默认值
return df[self.feature_columns]
def _handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame:
"""处理缺失值"""
# 数值型特征用中位数填充
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
df[col].fillna(df[col].median(), inplace=True)
# 分类特征用众数填充
categorical_cols = df.select_dtypes(include=['object']).columns
for col in categorical_cols:
df[col].fillna(df[col].mode()[0] if not df[col].mode().empty else "Unknown", inplace=True)
return df
def _feature_engineering(self, df: pd.DataFrame) -> pd.DataFrame:
"""特征工程"""
# 添加时间特征
if 'timestamp' in df.columns:
df['hour'] = pd.to_datetime(df['timestamp']).dt.hour
df['day_of_week'] = pd.to_datetime(df['timestamp']).dt.dayofweek
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
# 添加统计特征
if 'value' in df.columns:
df['value_rolling_mean'] = df.groupby('sensor_id')['value'].transform(
lambda x: x.rolling(window=5, min_periods=1).mean()
)
df['value_rolling_std'] = df.groupby('sensor_id')['value'].transform(
lambda x: x.rolling(window=5, min_periods=1).std()
)
# 添加工程特定特征
if all(col in df.columns for col in ['temperature', 'humidity']):
df['temp_humidity_ratio'] = df['temperature'] / (df['humidity'] + 1) # 避免除零
return df
def _select_features(self, df: pd.DataFrame) -> List[str]:
"""选择特征列"""
# 基于领域知识和相关性分析选择特征
base_features = [
'value', 'value_rolling_mean', 'value_rolling_std',
'hour', 'day_of_week', 'is_weekend'
]
# 添加存在的特征
available_features = [col for col in base_features if col in df.columns]
# 添加其他数值型特征(除了目标变量)
numeric_cols = df.select_dtypes(include=[np.number]).columns
additional_features = [col for col in numeric_cols
if col not in available_features + ['risk_level', 'has_issue']]
return available_features + additional_features[:10] # 限制特征数量
def train(self, X: pd.DataFrame, y: pd.Series, test_size: float = 0.2):
"""训练模型"""
# 预处理数据
X_processed = self.preprocess_data(X)
# 划分训练测试集
X_train, X_test, y_train, y_test = train_test_split(
X_processed, y, test_size=test_size, random_state=42, stratify=y
)
# 标准化特征
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
# 训练模型
self.model.fit(X_train_scaled, y_train)
# 评估模型
train_score = self.model.score(X_train_scaled, y_train)
test_score = self.model.score(X_test_scaled, y_test)
self.logger.info(f"Training accuracy: {train_score:.4f}")
self.logger.info(f"Testing accuracy: {test_score:.4f}")
# 保存模型
self._save_model()
return train_score, test_score
def predict(self, X: pd.DataFrame) -> np.array:
"""预测风险"""
X_processed = self.preprocess_data(X)
X_scaled = self.scaler.transform(X_processed)
return self.model.predict(X_scaled)
def predict_proba(self, X: pd.DataFrame) -> np.array:
"""预测风险概率"""
X_processed = self.preprocess_data(X)
X_scaled = self.scaler.transform(X_processed)
return self.model.predict_proba(X_scaled)
def _save_model(self):
"""保存模型"""
joblib.dump(self.model, f"{self.model_path}.pkl")
with open(f"{self.model_path}_features.pkl", 'wb') as f:
pickle.dump(self.feature_columns, f)
self.logger.info("Model saved successfully")
def calculate_risk_score(self, data: pd.DataFrame) -> pd.DataFrame:
"""计算风险评分"""
predictions = self.predict_proba(data)
# 获取正类的概率作为风险分数
risk_scores = predictions[:, 1] if predictions.shape[1] > 1 else predictions[:, 0]
result_df = data.copy()
result_df['risk_score'] = risk_scores
result_df['risk_level'] = pd.cut(
risk_scores,
bins=[0, 0.3, 0.6, 0.8, 1.0],
labels=['低', '中', '高', '极高']
)
return result_df
class AnomalyDetector:
"""异常检测器"""
def __init__(self):
self.models = {}
self.scalers = {}
def fit(self, data: pd.DataFrame, feature_columns: List[str],
entity_col: str = 'sensor_id'):
"""训练异常检测模型"""
self.feature_columns = feature_columns
for entity in data[entity_col].unique():
entity_data = data[data[entity_col] == entity]
X = entity_data[feature_columns]
# 标准化数据
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
self.scalers[entity] = scaler
# 训练隔离森林模型
model = IsolationForest(contamination=0.05, random_state=42)
model.fit(X_scaled)
self.models[entity] = model
def detect(self, data: pd.DataFrame, entity_col: str = 'sensor_id') -> pd.DataFrame:
"""检测异常"""
results = []
for entity in data[entity_col].unique():
if entity not in self.models:
continue
entity_data = data[data[entity_col] == entity]
X = entity_data[self.feature_columns]
# 标准化
X_scaled = self.scalers[entity].transform(X)
# 预测异常
anomalies = self.models[entity].predict(X_scaled)
anomaly_scores = self.models[entity].decision_function(X_scaled)
# 创建结果
entity_result = entity_data.copy()
entity_result['is_anomaly'] = anomalies == -1
entity_result['anomaly_score'] = anomaly_scores
results.append(entity_result)
return pd.concat(results, ignore_index=True) if results else pd.DataFrame()
3. 决策建议生成模块
from typing import Dict, List, Any, Optional
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
class DecisionSupportEngine:
def __init__(self, knowledge_base_path: str = "knowledge_base.json"):
self.knowledge_base = self._load_knowledge_base(knowledge_base_path)
self.recommendation_rules = self._load_recommendation_rules()
def _load_knowledge_base(self, path: str) -> Dict[str, Any]:
"""加载知识库"""
try:
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
return {
"common_issues": [],
"solutions": [],
"best_practices": [],
"regulatory_requirements": []
}
def _load_recommendation_rules(self) -> List[Dict[str, Any]]:
"""加载推荐规则"""
return [
{
"condition": lambda data: data['risk_level'] == '极高',
"action": self._generate_urgent_recommendation,
"priority": 1
},
{
"condition": lambda data: data['risk_level'] == '高',
"action": self._generate_high_risk_recommendation,
"priority": 2
},
{
"condition": lambda data: data['anomaly_detected'] == True,
"action": self._generate_anomaly_recommendation,
"priority": 3
},
{
"condition": lambda data: data['trend'] == 'deteriorating',
"action": self._generate_trend_recommendation,
"priority": 4
}
]
def generate_recommendations(self, risk_data: pd.DataFrame,
context: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""生成决策建议"""
recommendations = []
context = context or {}
# 对每个风险点生成建议
for _, row in risk_data.iterrows():
row_dict = row.to_dict()
row_dict.update(context)
# 应用规则生成建议
for rule in sorted(self.recommendation_rules, key=lambda x: x['priority']):
try:
if rule['condition'](row_dict):
recommendation = rule['action'](row_dict)
if recommendation:
recommendations.append({
**recommendation,
'entity_id': row_dict.get('sensor_id', row_dict.get('id')),
'timestamp': datetime.now(),
'priority': rule['priority']
})
break # 应用最高优先级规则后停止
except Exception as e:
print(f"Error applying rule: {e}")
continue
# 去重和排序
recommendations = self._deduplicate_recommendations(recommendations)
recommendations.sort(key=lambda x: (x['priority'], x['urgency']))
return recommendations
def _generate_urgent_recommendation(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""生成紧急风险建议"""
issue_type = data.get('issue_type', '未知问题')
return {
"type": "紧急处理",
"title": f"紧急: {issue_type}需要立即处理",
"description": f"检测到{issue_type},风险等级极高,需要立即采取措施",
"actions": [
"立即停止相关作业",
"通知项目经理和安全负责人",
"启动应急预案",
"组织专家现场评估"
],
"urgency": "critical",
"expected_impact": "高风险,可能造成严重安全事故",
"deadline": "立即执行"
}
def _generate_high_risk_recommendation(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""生成高风险建议"""
issue_type = data.get('issue_type', '未知问题')
risk_score = data.get('risk_score', 0)
return {
"type": "优先处理",
"title": f"优先处理: {issue_type}",
"description": f"检测到{issue_type},风险评分{risk_score:.2f},需要优先处理",
"actions": [
"24小时内进行详细检查",
"制定处理方案并报批",
"增加监测频率",
"准备必要的应急资源"
],
"urgency": "high",
"expected_impact": "中等风险,可能影响工程质量和进度",
"deadline": "24小时内"
}
def _generate_anomaly_recommendation(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""生成异常检测建议"""
anomaly_score = data.get('anomaly_score', 0)
metric = data.get('metric', '关键指标')
return {
"type": "异常调查",
"title": f"异常检测: {metric}出现异常",
"description": f"检测到{metric}异常,异常评分{anomaly_score:.2f},需要调查原因",
"actions": [
"检查数据采集设备是否正常",
"分析历史数据 patterns",
"对比设计要求和实际值",
"编写异常分析报告"
],
"urgency": "medium",
"expected_impact": "需要调查的潜在问题",
"deadline": "3天内"
}
def _generate_trend_recommendation(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""生成趋势预警建议"""
metric = data.get('metric', '关键指标')
trend = data.get('trend_direction', '恶化')
return {
"type": "趋势预警",
"title": f"趋势预警: {metric}正在{trend}",
"description": f"检测到{metric}呈现{trend}趋势,需要关注并采取措施",
"actions": [
"加强监测频率",
"分析趋势原因",
"评估长期影响",
"制定预防措施"
],
"urgency": "low",
"expected_impact": "潜在长期风险",
"deadline": "1周内"
}
def _deduplicate_recommendations(self, recommendations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""去重建议"""
seen = set()
unique_recommendations = []
for rec in recommendations:
# 基于标题和实体创建唯一标识
identifier = f"{rec['title']}_{rec.get('entity_id', '')}"
if identifier not in seen:
seen.add(identifier)
unique_recommendations.append(rec)
return unique_recommendations
def generate_preventive_measures(self, project_type: str,
risk_patterns: Dict[str, Any]) -> List[Dict[str, Any]]:
"""生成预防性措施建议"""
measures = []
# 基于项目类型和风险模式生成预防措施
if project_type in self.knowledge_base.get("preventive_measures", {}):
type_specific_measures = self.knowledge_base["preventive_measures"][project_type]
measures.extend(type_specific_measures)
# 基于风险模式添加特定措施
for pattern, severity in risk_patterns.items():
if pattern in self.knowledge_base.get("pattern_based_measures", {}):
pattern_measures = self.knowledge_base["pattern_based_measures"][pattern]
# 根据严重程度调整措施
adjusted_measures = self._adjust_measures_by_severity(pattern_measures, severity)
measures.extend(adjusted_measures)
return measures
def _adjust_measures_by_severity(self, measures: List[Dict[str, Any]],
severity: str) -> List[Dict[str, Any]]:
"""根据严重程度调整措施"""
severity_levels = {"low": 1, "medium": 2, "high": 3, "critical": 4}
current_level = severity_levels.get(severity, 1)
adjusted_measures = []
for measure in measures:
measure_severity = measure.get('min_severity', 'low')
measure_level = severity_levels.get(measure_severity, 1)
if current_level >= measure_level:
adjusted_measures.append(measure)
return adjusted_measures
4. 质量数据标准化管理
import pandas as pd
import numpy as np
from datetime import datetime
from typing import Dict, List, Any, Optional
import json
import jsonschema
from jsonschema import validate
import xml.etree.ElementTree as ET
import yaml
class QualityDataStandardizer:
def __init__(self, standards_config: str = "standards/config.yaml"):
self.standards = self._load_standards(standards_config)
self.schemas = self._load_schemas()
def _load_standards(self, config_path: str) -> Dict[str, Any]:
"""加载数据标准配置"""
try:
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except FileNotFoundError:
return {
"data_formats": {
"numeric": {"decimal_places": 2, "units": "SI"},
"temporal": {"format": "ISO8601"},
"categorical": {"encoding": "UTF-8"}
},
"quality_standards": {
"tolerance_levels": {},
"acceptance_criteria": {}
}
}
def _load_schemas(self) -> Dict[str, Any]:
"""加载数据模式"""
schemas = {}
# 加载各种数据类型的JSON Schema
schema_files = {
"sensor_data": "schemas/sensor_data_schema.json",
"inspection_data": "schemas/inspection_data_schema.json",
"material_test": "schemas/material_test_schema.json"
}
for schema_name, schema_path in schema_files.items():
try:
with open(schema_path, 'r', encoding='utf-8') as f:
schemas[schema_name] = json.load(f)
except FileNotFoundError:
print(f"Schema file not found: {schema_path}")
continue
return schemas
def standardize_data(self, data: Dict[str, Any], data_type: str) -> Dict[str, Any]:
"""标准化数据"""
standardized_data = data.copy()
# 验证数据模式
if data_type in self.schemas:
try:
validate(instance=data, schema=self.schemas[data_type])
except jsonschema.ValidationError as e:
print(f"Data validation error: {e}")
# 尝试修复数据
standardized_data = self._fix_data_issues(data, data_type, e)
# 应用数据格式标准
standardized_data = self._apply_format_standards(standardized_data, data_type)
# 应用质量标淮
standardized_data = self._apply_quality_standards(standardized_data, data_type)
return standardized_data
def _fix_data_issues(self, data: Dict[str, Any], data_type: str,
error: jsonschema.ValidationError) -> Dict[str, Any]:
"""修复数据问题"""
fixed_data = data.copy()
# 根据错误类型进行修复
if "required" in error.message:
# 处理缺失必需字段
missing_field = error.message.split("'")[1]
fixed_data[missing_field] = self._get_default_value(missing_field, data_type)
elif "type" in error.message:
# 处理类型错误
field_path = error.path[0] if error.path else None
if field_path and field_path in fixed_data:
current_value = fixed_data[field_path]
expected_type = error.validator_value
fixed_data[field_path] = self._convert_type(current_value, expected_type)
return fixed_data
def _get_default_value(self, field: str, data_type: str) -> Any:
"""获取字段默认值"""
defaults = {
"timestamp": datetime.now().isoformat(),
"value": 0.0,
"unit": "unknown",
"status": "pending",
"quality_rating": "未评级"
}
# 数据类型特定的默认值
type_specific_defaults = {
"sensor_data": {
"sensor_status": "active",
"reading_quality": "raw"
},
"inspection_data": {
"inspector_id": "unknown",
"approval_status": "pending"
}
}
if data_type in type_specific_defaults and field in type_specific_defaults[data_type]:
return type_specific_defaults[data_type][field]
return defaults.get(field, None)
def _convert_type(self, value: Any, target_type: str) -> Any:
"""类型转换"""
try:
if target_type == "string":
return str(value)
elif target_type == "number":
return float(value)
elif target_type == "integer":
return int(float(value))
elif target_type == "boolean":
return bool(value)
elif target_type == "array" and not isinstance(value, list):
return [value]
else:
return value
except (ValueError, TypeError):
return self._get_default_value_for_type(target_type)
def _get_default_value_for_type(self, target_type: str) -> Any:
"""获取类型默认值"""
type_defaults = {
"string": "",
"number": 0.0,
"integer": 0,
"boolean": False,
"array": [],
"object": {}
}
return type_defaults.get(target_type, None)
def _apply_format_standards(self, data: Dict[str, Any], data_type: str) -> Dict[str, Any]:
"""应用格式标准"""
formatted_data = data.copy()
# 数值格式标准化
numeric_fields = self._get_numeric_fields(data_type)
for field in numeric_fields:
if field in formatted_data and formatted_data[field] is not None:
formatted_data[field] = self._format_numeric(
formatted_data[field],
self.standards["data_formats"]["numeric"]
)
# 时间格式标准化
temporal_fields = self._get_temporal_fields(data_type)
for field in temporal_fields:
if field in formatted_data and formatted_data[field] is not None:
formatted_data[field] = self._format_temporal(
formatted_data[field],
self.standards["data_formats"]["temporal"]
)
return formatted_data
def _get_numeric_fields(self, data_type: str) -> List[str]:
"""获取数值字段列表"""
# 根据数据类型返回数值字段
field_mapping = {
"sensor_data": ["value", "threshold", "accuracy"],
"inspection_data": ["measurement", "tolerance", "deviation"],
"material_test": ["strength", "density", "moisture_content"]
}
return field_mapping.get(data_type, [])
def _get_temporal_fields(self, data_type: str) -> List[str]:
"""获取时间字段列表"""
field_mapping = {
"sensor_data": ["timestamp", "last_calibration"],
"inspection_data": ["inspection_date", "next_inspection_due"],
"material_test": ["test_date", "curing_start_time"]
}
return field_mapping.get(data_type, [])
def _format_numeric(self, value: Any, format_config: Dict[str, Any]) -> float:
"""格式化数值"""
try:
numeric_value = float(value)
decimal_places = format_config.get("decimal_places", 2)
return round(numeric_value, decimal_places)
except (ValueError, TypeError):
return 0.0
def _format_temporal(self, value: Any, format_config: Dict[str, Any]) -> str:
"""格式化时间"""
try:
if isinstance(value, datetime):
dt = value
elif isinstance(value, str):
# 尝试解析各种常见格式
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", "%d/%m/%Y", "%Y%m%dT%H%M%S"):
try:
dt = datetime.strptime(value, fmt)
break
except ValueError:
continue
else:
raise ValueError(f"无法解析时间格式: {value}")
else:
raise ValueError(f"不支持的时间数据类型: {type(value)}")
return dt.isoformat()
except (ValueError, TypeError):
return datetime.now().isoformat()
def _apply_quality_standards(self, data: Dict[str, Any], data_type: str) -> Dict[str, Any]:
"""应用质量标准"""
quality_data = data.copy()
# 根据质量标准评估数据
if data_type in self.standards.get("quality_standards", {}):
standards = self.standards["quality_standards"][data_type]
quality_data["quality_rating"] = self._assess_quality(data, standards)
return quality_data
def _assess_quality(self, data: Dict[str, Any], standards: Dict[str, Any]) -> str:
"""评估数据质量"""
score = 0
max_score = 100
# 检查完整性
completeness_score = self._assess_completeness(data, standards.get("completeness_criteria", {}))
score += completeness_score * 0.3 # 完整性权重30%
# 检查准确性
accuracy_score = self._assess_accuracy(data, standards.get("accuracy_criteria", {}))
score += accuracy_score * 0.4 # 准确性权重40%
# 检查时效性
timeliness_score = self._assess_timeliness(data, standards.get("timeliness_criteria", {}))
score += timeliness_score * 0.3 # 时效性权重30%
# 根据总分评级
if score >= 90:
return "优秀"
elif score >= 75:
return "良好"
elif score >= 60:
return "合格"
else:
return "不合格"
def _assess_completeness(self, data: Dict[str, Any], criteria: Dict[str, Any]) -> float:
"""评估完整性"""
required_fields = criteria.get("required_fields", [])
if not required_fields:
return 100 # 没有必需字段要求时给满分
present_count = sum(1 for field in required_fields if field in data and data[field] is not None)
return (present_count / len(required_fields)) * 100
def _assess_accuracy(self, data: Dict[str, Any], criteria: Dict[str, Any]) -> float:
"""评估准确性"""
# 检查数值是否在合理范围内
range_checks = criteria.get("value_ranges", {})
accuracy_score = 100
for field, value_range in range_checks.items():
if field in data and data[field] is not None:
min_val, max_val = value_range
if not (min_val <= data[field] <= max_val):
accuracy_score -= 20 # 每个超出范围的字段扣20分
return max(0, accuracy_score) # 确保不低于0
def _assess_timeliness(self, data: Dict[str, Any], criteria: Dict[str, Any]) -> float:
"""评估时效性"""
timestamp_field = criteria.get("timestamp_field", "timestamp")
max_age_hours = criteria.get("max_age_hours", 24)
if timestamp_field not in data:
return 0
try:
if isinstance(data[timestamp_field], str):
data_time = datetime.fromisoformat(data[timestamp_field])
else:
data_time = data[timestamp_field]
age_hours = (datetime.now() - data_time).total_seconds() / 3600
timeliness = max(0, 100 - (age_hours / max_age_hours) * 100)
return timeliness
except (ValueError, TypeError):
return 0
5. 可视化分析平台
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import json
class VisualizationEngine:
def __init__(self, theme: str = "light"):
self.theme = theme
self.color_palette = self._get_color_palette(theme)
def _get_color_palette(self, theme: str) -> Dict[str, Any]:
"""获取颜色调色板"""
palettes = {
"light": {
"primary": "#1f77b4",
"secondary": "#ff7f0e",
"success": "#2ca02c",
"warning": "#d62728",
"danger": "#9467bd",
"background": "#ffffff",
"text": "#2c3e50"
},
"dark": {
"primary": "#6baed6",
"secondary": "#fd8d3c",
"success": "#74c476",
"warning": "#e6550d",
"danger": "#9e9ac8",
"background": "#2d3748",
"text": "#e2e8f0"
}
}
return palettes.get(theme, palettes["light"])
def create_risk_dashboard(self, risk_data: pd.DataFrame,
time_range: Dict[str, datetime] = None) -> go.Figure:
"""创建风险仪表盘"""
if time_range:
start_date, end_date = time_range.get('start'), time_range.get('end')
filtered_data = risk_data[
(risk_data['timestamp'] >= start_date) &
(risk_data['timestamp'] <= end_date)
]
else:
filtered_data = risk_data
# 创建子图
fig = make_subplots(
rows=2, cols=2,
subplot_titles=(
'风险趋势图',
'风险等级分布',
'风险热点图',
'TOP 10风险项'
),
specs=[[{"type": "scatter"}, {"type": "pie"}],
[{"type": "heatmap"}, {"type": "bar"}]]
)
# 风险趋势图
trend_fig = self._create_risk_trend_chart(filtered_data)
for trace in trend_fig.data:
fig.add_trace(trace, row=1, col=1)
# 风险等级分布
distribution_fig = self._create_risk_distribution_chart(filtered_data)
for trace in distribution_fig.data:
fig.add_trace(trace, row=1, col=2)
# 风险热点图
heatmap_fig = self._create_risk_heatmap(filtered_data)
for trace in heatmap_fig.data:
fig.add_trace(trace, row=2, col=1)
# TOP 10风险项
top_risks_fig = self._create_top_risks_chart(filtered_data)
for trace in top_risks_fig.data:
fig.add_trace(trace, row=2, col=2)
# 更新布局
fig.update_layout(
height=800,
showlegend=False,
title_text="工程质量风险分析仪表盘",
template="plotly_white" if self.theme == "light" else "plotly_dark"
)
return fig
def _create_risk_trend_chart(self, data: pd.DataFrame) -> go.Figure:
"""创建风险趋势图"""
# 按时间聚合风险数据
daily_risk = data.set_index('timestamp').resample('D').agg({
'risk_score': 'mean',
'entity_id': 'count'
}).rename(columns={'entity_id': 'count'}).reset_index()
fig = go.Figure()
# 风险分数趋势线
fig.add_trace(go.Scatter(
x=daily_risk['timestamp'],
y=daily_risk['risk_score'],
mode='lines+markers',
name='平均风险分',
line=dict(color=self.color_palette['primary'], width=2),
yaxis='y1'
))
# 风险数量柱状图
fig.add_trace(go.Bar(
x=daily_risk['timestamp'],
y=daily_risk['count'],
name='风险数量',
marker_color=self.color_palette['secondary'],
opacity=0.6,
yaxis='y2'
))
# 双Y轴配置
fig.update_layout(
yaxis=dict(
title="风险分数",
titlefont=dict(color=self.color_palette['primary']),
tickfont=dict(color=self.color_palette['primary']),
range=[0, 1]
),
yaxis2=dict(
title="风险数量",
titlefont=dict(color=self.color_palette['secondary']),
tickfont=dict(color=self.color_palette['secondary']),
anchor="x",
overlaying="y",
side="right"
)
)
return fig
def _create_risk_distribution_chart(self, data: pd.DataFrame) -> go.Figure:
"""创建风险等级分布图"""
risk_counts = data['risk_level'].value_counts()
# 定义颜色映射
color_map = {
'低': self.color_palette['success'],
'中': self.color_palette['warning'],
'高': self.color_palette['danger'],
'极高': '#ff0000'
}
colors = [color_map.get(level, self.color_palette['primary'])
for level in risk_counts.index]
fig = go.Figure(data=[go.Pie(
labels=risk_counts.index,
values=risk_counts.values,
hole=.3,
marker_colors=colors
)])
fig.update_traces(
textinfo='percent+label',
pull=[0.1 if level == '极高' else 0 for level in risk_counts.index]
)
return fig
def _create_risk_heatmap(self, data: pd.DataFrame) -> go.Figure:
"""创建风险热点图"""
# 创建时间-类型热点图
data['hour'] = data['timestamp'].dt.hour
data['day_of_week'] = data['timestamp'].dt.dayofweek
heatmap_data = data.pivot_table(
values='risk_score',
index='day_of_week',
columns='hour',
aggfunc='mean',
fill_value=0
)
days = ['周一', '周二', '周三', '周四', '周五', '周六', '周日']
hours = [f"{h:02d}:00" for h in range(24)]
fig = go.Figure(data=go.Heatmap(
z=heatmap_data.values,
x=hours,
y=[days[i] for i in heatmap_data.index],
colorscale='Reds',
showscale=True
))
fig.update_layout(
xaxis_title="小时",
yaxis_title="星期"
)
return fig
def _create_top_risks_chart(self, data: pd.DataFrame) -> go.Figure:
"""创建TOP风险项图表"""
top_risks = data.groupby('issue_type').agg({
'risk_score': 'mean',
'entity_id': 'count'
}).rename(columns={'entity_id': 'count'}).nlargest(10, 'risk_score')
fig = go.Figure()
fig.add_trace(go.Bar(
x=top_risks['risk_score'],
y=top_risks.index,
orientation='h',
marker_color=self.color_palette['danger'],
name='平均风险分'
))
# 添加数量标注
for i, (risk, row) in enumerate(top_risks.iterrows()):
fig.add_annotation(
x=row['risk_score'] + 0.05,
y=risk,
text=str(row['count']),
showarrow=False,
font=dict(color=self.color_palette['text'])
)
fig.update_layout(
xaxis_title="平均风险分数",
yaxis_title="风险类型",
showlegend=False
)
return fig
def create_geospatial_view(self, data: pd.DataFrame) -> go.Figure:
"""创建地理空间视图"""
if 'latitude' not in data.columns or 'longitude' not in data.columns:
raise ValueError("数据缺少经纬度信息")
# 创建散点地图
fig = px.scatter_mapbox(
data,
lat="latitude",
lon="longitude",
color="risk_level",
size="risk_score",
hover_name="issue_type",
hover_data=["risk_score", "timestamp"],
color_discrete_map={
"低": self.color_palette['success'],
"中": self.color_palette['warning'],
"高": self.color_palette['danger'],
"极高": "#ff0000"
},
zoom=10,
height=600
)
fig.update_layout(
mapbox_style="open-street-map",
margin={"r":0,"t":0,"l":0,"b":0}
)
return fig
def create_comparative_analysis(self, current_data: pd.DataFrame,
historical_data: pd.DataFrame) -> go.Figure:
"""创建对比分析图表"""
fig = make_subplots(
rows=1, cols=2,
subplot_titles=('当前周期', '历史同期'),
specs=[[{"type": "bar"}, {"type": "bar"}]]
)
# 当前周期风险分布
current_counts = current_data['risk_level'].value_counts()
fig.add_trace(go.Bar(
x=current_counts.index,
y=current_counts.values,
name='当前周期',
marker_color=[self.color_palette['success'],
self.color_palette['warning'],
self.color_palette['danger'],
'#ff0000']
), row=1, col=1)
# 历史同期风险分布
historical_counts = historical_data['risk_level'].value_counts()
fig.add_trace(go.Bar(
x=historical_counts.index,
y=historical_counts.values,
name='历史同期',
marker_color=[self.color_palette['success'],
self.color_palette['warning'],
self.color_palette['danger'],
'#ff0000']
), row=1, col=2)
fig.update_layout(
title_text="风险分布对比分析",
showlegend=False
)
return fig
def export_report(self, figures: Dict[str, go.Figure],
data: pd.DataFrame, format: str = "html") -> str:
"""导出分析报告"""
if format == "html":
return self._generate_html_report(figures, data)
elif format == "pdf":
return self._generate_pdf_report(figures, data)
else:
raise ValueError(f"不支持的格式: {format}")
def _generate_html_report(self, figures: Dict[str, go.Figure],
data: pd.DataFrame) -> str:
"""生成HTML报告"""
report_date = datetime.now().strftime("%Y年%m月%d日")
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>工程质量风险分析报告 - {report_date}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 40px; }}
.header {{ text-align: center; margin-bottom: 30px; }}
.section {{ margin-bottom: 40px; }}
.chart {{ margin: 20px 0; }}
.summary {{ background-color: #f5f5f5; padding: 20px; border-radius: 5px; }}
table {{ width: 100%; border-collapse: collapse; }}
th, td {{ padding: 12px; text-align: left; border-bottom: 1px solid #ddd; }}
th {{ background-color: {self.color_palette['primary']}; color: white; }}
</style>
</head>
<body>
<div class="header">
<h1>工程质量风险分析报告</h1>
<p>生成日期: {report_date}</p>
</div>
<div class="section">
<h2>执行摘要</h2>
<div class="summary">
{self._generate_executive_summary(data)}
</div>
</div>
<div class="section">
<h2>风险分析仪表盘</h2>
<div class="chart">
{figures['dashboard'].to_html()}
</div>
</div>
</body>
</html>
"""
return html_content
def _generate_executive_summary(self, data: pd.DataFrame) -> str:
"""生成执行摘要"""
total_risks = len(data)
high_risks = len(data[data['risk_level'].isin(['高', '极高'])])
avg_risk_score = data['risk_score'].mean()
summary = f"""
<p>本期共检测到 <strong>{total_risks}</strong> 个风险点,其中高风险项目 <strong>{high_risks}</strong> 个。</p>
<p>平均风险分数: <strong>{avg_risk_score:.2f}</strong></p>
<p>主要风险类型: {', '.join(data['issue_type'].value_counts().head(3).index.tolist())}</p>
"""
return summary
6. 跨平台同步服务
import asyncio
import aiohttp
from aiohttp import web
import json
from datetime import datetime
from typing import Dict, List, Any, Optional
import redis
import pickle
from websockets import connect
import ssl
class CrossPlatformSyncService:
def __init__(self, redis_host: str = 'localhost', redis_port: int = 6379):
self.redis_client = redis.Redis(host=redis_host, port=redis_port, db=0)
self.websocket_connections = {}
self.sync_queues = {}
async def start_websocket_server(self, host: str = '0.0.0.0', port: int = 8765):
"""启动WebSocket服务器"""
app = web.Application()
app.router.add_get('/ws', self.websocket_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, host, port)
await site.start()
print(f"WebSocket server started on ws://{host}:{port}")
async def websocket_handler(self, request):
"""处理WebSocket连接"""
ws = web.WebSocketResponse()
await ws.prepare(request)
# 获取客户端信息
client_id = request.query.get('client_id', 'unknown')
platform = request.query.get('platform', 'unknown')
print(f"Client connected: {client_id} ({platform})")
self.websocket_connections[client_id] = ws
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await self.handle_message(client_id, msg.data)
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"WebSocket error with {client_id}: {ws.exception()}")
finally:
del self.websocket_connections[client_id]
print(f"Client disconnected: {client_id}")
return ws
async def handle_message(self, client_id: str, message: str):
"""处理接收到的消息"""
try:
data = json.loads(message)
message_type = data.get('type')
if message_type == 'sync_request':
await self.handle_sync_request(client_id, data)
elif message_type == 'data_update':
await self.handle_data_update(client_id, data)
elif message_type == 'conflict_resolution':
await self.handle_conflict_resolution(client_id, data)
else:
print(f"Unknown message type: {message_type}")
except json.JSONDecodeError as e:
print(f"Invalid JSON message from {client_id}: {e}")
async def handle_sync_request(self, client_id: str, data: Dict[str, Any]):
"""处理同步请求"""
sync_token = data.get('sync_token')
platform = data.get('platform')
# 获取自上次同步以来的变更
changes = await self.get_changes_since(sync_token)
# 发送变更给客户端
response = {
'type': 'sync_response',
'changes': changes,
'new_sync_token': datetime.now().isoformat()
}
await self.send_to_client(client_id, response)
async def get_changes_since(self, sync_token: str) -> List[Dict[str, Any]]:
"""获取自指定同步令牌以来的变更"""
if not sync_token:
# 首次同步,返回所有数据
return await self.get_all_data()
# 从Redis获取变更记录
changes_key = f"changes_since:{sync_token}"
changes_data = self.redis_client.get(changes_key)
if changes_data:
return pickle.loads(changes_data)
else:
# 令牌过期或无效,返回全量数据
return await self.get_all_data()
async def get_all_data(self) -> List[Dict[str, Any]]:
"""获取所有数据"""
# 从数据库或缓存中获取所有需要同步的数据
# 这里简化为空列表,实际应实现具体逻辑
return []
async def handle_data_update(self, client_id: str, data: Dict[str, Any]):
"""处理数据更新"""
update_data = data.get('data', {})
update_type = data.get('update_type') # create, update, delete
# 验证和处理更新
processed_data = await self.process_update(update_data, update_type, client_id)
if processed_data:
# 存储到数据库
await self.store_update(processed_data)
# 广播给其他客户端
await self.broadcast_update({
'type': 'data_update',
'data': processed_data,
'update_type': update_type,
'source_client': client_id,
'timestamp': datetime.now().isoformat()
})
async def process_update(self, data: Dict[str, Any], update_type: str,
client_id: str) -> Optional[Dict[str, Any]]:
"""处理并验证更新"""
# 数据验证
if not self.validate_data(data):
print(f"Invalid data from client {client_id}")
return None
# 冲突检测
if update_type in ['update', 'delete']:
conflict = await self.detect_conflict(data, client_id)
if conflict:
await self.resolve_conflict(conflict, client_id)
return None
# 添加元数据
processed_data = data.copy()
processed_data['last_modified'] = datetime.now().isoformat()
processed_data['modified_by'] = client_id
processed_data['version'] = data.get('version', 0) + 1
return processed_data
def validate_data(self, data: Dict[str, Any]) -> bool:
"""验证数据有效性"""
required_fields = ['id', 'type', 'timestamp']
for field in required_fields:
if field not in data:
return False
# 添加更多验证逻辑
return True
async def detect_conflict(self, data: Dict[str, Any], client_id: str) -> Optional[Dict[str, Any]]:
"""检测数据冲突"""
current_data = await self.get_current_data(data['id'])
if not current_data:
return None
# 检查版本冲突
if data.get('version', 0) < current_data.get('version', 0):
return {
'type': 'version_conflict',
'current_data': current_data,
'incoming_data': data,
'client_id': client_id
}
# 检查时间戳冲突(如果客户端时钟不同步)
current_timestamp = current_data.get('last_modified')
incoming_timestamp = data.get('last_modified')
if current_timestamp and incoming_timestamp:
current_dt = datetime.fromisoformat(current_timestamp)
incoming_dt = datetime.fromisoformat(incoming_timestamp)
if incoming_dt < current_dt:
return {
'type': 'timestamp_conflict',
'current_data': current_data,
'incoming_data': data,
'client_id': client_id
}
return None
async def resolve_conflict(self, conflict: Dict[str, Any], client_id: str):
"""解决数据冲突"""
# 通知客户端有冲突,让客户端决定如何解决
conflict_message = {
'type': 'conflict_detected',
'conflict': conflict,
'timestamp': datetime.now().isoformat()
}
await self.send_to_client(client_id, conflict_message)
async def get_current_data(self, data_id: str) -> Optional[Dict[str, Any]]:
"""获取当前数据"""
# 从数据库或缓存中获取数据
# 这里简化为返回None,实际应实现具体逻辑
return None
async def store_update(self, data: Dict[str, Any]):
"""存储更新到数据库"""
# 实现数据存储逻辑
# 同时记录变更历史
change_record = {
'data_id': data['id'],
'action': 'update',
'timestamp': datetime.now(),
'data': data
}
# 存储到Redis变更日志
change_key = f"change:{data['id']}:{datetime.now().timestamp()}"
self.redis_client.set(change_key, pickle.dumps(change_record))
async def broadcast_update(self, update: Dict[str, Any]):
"""广播更新给所有客户端"""
message = json.dumps(update)
for client_id, ws in self.websocket_connections.items():
if client_id != update.get('source_client'):
try:
await ws.send_str(message)
except Exception as e:
print(f"Error sending to client {client_id}: {e}")
async def send_to_client(self, client_id: str, message: Dict[str, Any]):
"""发送消息给特定客户端"""
if client_id in self.websocket_connections:
try:
await self.websocket_connections[client_id].send_str(json.dumps(message))
except Exception as e:
print(f"Error sending to client {client_id}: {e}")
else:
print(f"Client {client_id} not connected")
async def sync_with_cloud(self, cloud_endpoint: str, api_key: str):
"""与云服务同步"""
ssl_context = ssl.create_default_context()
async with connect(cloud_endpoint, ssl=ssl_context) as websocket:
# 认证
auth_message = {
'type': 'auth',
'api_key': api_key,
'timestamp': datetime.now().isoformat()
}
await websocket.send(json.dumps(auth_message))
# 处理云服务消息
async for message in websocket:
data = json.loads(message)
await self.handle_cloud_message(data, websocket)
async def handle_cloud_message(self, data: Dict[str, Any], websocket):
"""处理云服务消息"""
message_type = data.get('type')
if message_type == 'sync_request':
# 处理云服务的同步请求
await self.handle_cloud_sync_request(data, websocket)
elif message_type == 'data_update':
# 处理来自云服务的数据更新
await self.handle_cloud_data_update(data)
async def handle_cloud_sync_request(self, data: Dict[str, Any], websocket):
"""处理云服务的同步请求"""
# 获取本地变更并发送给云服务
changes = await self.get_changes_since(data.get('sync_token'))
response = {
'type': 'sync_response',
'changes': changes,
'new_sync_token': datetime.now().isoformat()
}
await websocket.send(json.dumps(response))
async def handle_cloud_data_update(self, data: Dict[str, Any]):
"""处理来自云服务的数据更新"""
# 应用云服务的更新到本地
update_data = data.get('data', {})
update_type = data.get('update_type')
processed_data = await self.process_update(update_data, update_type, 'cloud')
if processed_data:
await self.store_update(processed_data)
# 广播给所有连接的客户端(除了云服务)
await self.broadcast_update({
'type': 'data_update',
'data': processed_data,
'update_type': update_type,
'source_client': 'cloud',
'timestamp': datetime.now().isoformat()
})
系统集成与部署
系统配置
# config/system_config.yaml
database:
host: localhost
port: 5432
name: construction_quality_db
user: admin
password: password
redis:
host: localhost
port: 6379
ml_models:
risk_predictor:
path: models/risk_predictor.pkl
retrain_interval: 86400 # 每天重训练一次
anomaly_detector:
contamination: 0.05
random_state: 42
api:
host: 0.0.0.0
port: 8000
workers: 4
log_level: info
sync_service:
websocket:
host: 0.0.0.0
port: 8765
cloud:
endpoint: wss://cloud-sync.example.com/ws
api_key: your_api_key_here
data_standards:
schema_path: schemas/
validation:
strict_mode: true
auto_fix: true
visualization:
theme: light
export_formats: [html, pdf, png]
default_time_range: 30 # 天
主应用程序
import asyncio
import uvicorn
from fastapi import FastAPI, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from typing import Dict, List, Any
import yaml
import logging
from logging.config import dictConfig
from data_collector import DataCollector
from risk_predictor import QualityRiskPredictor, AnomalyDetector
from decision_support import DecisionSupportEngine
from data_standardizer import QualityDataStandardizer
from visualization_engine import VisualizationEngine
from sync_service import CrossPlatformSyncService
# 配置日志
log_config = {
"version": 1,
"formatters": {
"default": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "default",
"level": "INFO"
},
"file": {
"class": "logging.handlers.RotatingFileHandler",
"formatter": "default",
"filename": "logs/quality_system.log",
"maxBytes": 10485760, # 10MB
"backupCount": 5,
"level": "DEBUG"
}
},
"root": {
"handlers": ["console", "file"],
"level": "INFO"
}
}
dictConfig(log_config)
logger = logging.getLogger(__name__)
# 加载配置
with open('config/system_config.yaml', 'r') as f:
config = yaml.safe_load(f)
# 全局组件实例
data_collector = None
risk_predictor = None
anomaly_detector = None
decision_engine = None
data_standardizer = None
visualization_engine = None
sync_service = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
# 启动时初始化
global data_collector, risk_predictor, anomaly_detector
global decision_engine, data_standardizer, visualization_engine, sync_service
logger.info("Initializing system components...")
# 初始化各组件
data_collector = DataCollector(config['database'])
risk_predictor = QualityRiskPredictor(config['ml_models']['risk_predictor']['path'])
anomaly_detector = AnomalyDetector()
decision_engine = DecisionSupportEngine()
data_standardizer = QualityDataStandardizer()
visualization_engine = VisualizationEngine(config['visualization']['theme'])
sync_service = CrossPlatformSyncService(
config['redis']['host'], config['redis']['port']
)
# 启动后台任务
asyncio.create_task(data_collector.integrate_data())
asyncio.create_task(sync_service.start_websocket_server(
config['sync_service']['websocket']['host'],
config['sync_service']['websocket']['port']
))
asyncio.create_task(sync_service.sync_with_cloud(
config['sync_service']['cloud']['endpoint'],
config['sync_service']['cloud']['api_key']
))
logger.info("System initialization completed")
yield
# 关闭时清理
logger.info("Shutting down system...")
# 清理资源
# 创建FastAPI应用
app = FastAPI(
title="工程建设质量风险预测与决策支持系统",
description="AI驱动的工程质量风险管理平台",
version="1.0.0",
lifespan=lifespan
)
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# API路由
@app.get("/")
async def root():
return {"message": "工程建设质量风险预测与决策支持系统"}
@app.post("/api/data/collect")
async def collect_data(data: Dict[str, Any], data_type: str):
"""数据采集接口"""
try:
# 标准化数据
standardized_data = data_standardizer.standardize_data(data, data_type)
# 根据数据类型路由到不同的采集方法
if data_type == "sensor":
await data_collector.collect_sensor_data([standardized_data])
elif data_type == "manual":
await data_collector.collect_manual_data(standardized_data)
else:
raise HTTPException(status_code=400, detail=f"Unknown data type: {data_type}")
return {"status": "success", "message": "Data collected successfully"}
except Exception as e:
logger.error(f"Error collecting data: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/risk/predict")
async def predict_risk(project_id: str, start_date: str = None, end_date: str = None):
"""风险预测接口"""
try:
# 获取项目数据
project_data = await _get_project_data(project_id, start_date, end_date)
if project_data.empty:
return {"warning": "No data available for prediction"}
# 风险预测
risk_results = risk_predictor.calculate_risk_score(project_data)
# 异常检测
anomaly_results = anomaly_detector.detect(risk_results)
# 生成决策建议
recommendations = decision_engine.generate_recommendations(anomaly_results, {
'project_id': project_id,
'time_range': {'start': start_date, 'end': end_date}
})
return {
"risk_assessment": risk_results.to_dict('records'),
"anomaly_detection": anomaly_results.to_dict('records'),
"recommendations": recommendations
}
except Exception as e:
logger.error(f"Error predicting risk: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/visualization/dashboard")
async def get_dashboard(project_id: str, time_range: str = "30d"):
"""获取可视化仪表盘"""
try:
# 获取数据
end_date = datetime.now()
if time_range.endswith('d'):
days = int(time_range[:-1])
start_date = end_date - timedelta(days=days)
else:
start_date = end_date - timedelta(days=30) # 默认30天
project_data = await _get_project_data(project_id, start_date, end_date)
if project_data.empty:
return {"warning": "No data available for visualization"}
# 风险预测
risk_results = risk_predictor.calculate_risk_score(project_data)
# 创建可视化
dashboard_fig = visualization_engine.create_risk_dashboard(
risk_results,
{'start': start_date, 'end': end_date}
)
# 转换为JSON
dashboard_json = dashboard_fig.to_json()
return {"dashboard": dashboard_json}
except Exception as e:
logger.error(f"Error creating dashboard: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/report/generate")
async def generate_report(project_id: str, report_format: str = "html"):
"""生成分析报告"""
try:
# 获取数据
end_date = datetime.now()
start_date = end_date - timedelta(days=30)
project_data = await _get_project_data(project_id, start_date, end_date)
if project_data.empty:
return {"warning": "No data available for report generation"}
# 风险预测
risk_results = risk_predictor.calculate_risk_score(project_data)
# 创建可视化图表
figures = {
'dashboard': visualization_engine.create_risk_dashboard(risk_results),
'geospatial': visualization_engine.create_geospatial_view(risk_results)
}
# 生成报告
report_content = visualization_engine.export_report(
figures, risk_results, report_format
)
return {
"report_content": report_content,
"format": report_format
}
except Exception as e:
logger.error(f"Error generating report: {e}")
raise HTTPException(status_code=500, detail=str(e))
async def _get_project_data(project_id: str, start_date: datetime = None,
end_date: datetime = None) -> pd.DataFrame:
"""获取项目数据(模拟实现)"""
# 实际应用中应从数据库获取数据
# 这里返回空DataFrame作为示例
return pd.DataFrame()
# 启动应用
if __name__ == "__main__":
uvicorn.run(
app,
host=config['api']['host'],
port=config['api']['port'],
log_level=config['api']['log_level']
)
系统部署与运维
Docker部署配置
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件并安装
COPY requirements.txt .
RUN pip install -r requirements.txt
# 复制应用代码
COPY . .
# 创建日志目录
RUN mkdir -p logs
# 暴露端口
EXPOSE 8000 8765
# 启动命令
CMD ["sh", "-c", "python main.py & python -m uvicorn main:app --host 0.0.0.0 --port 8000"]
# docker-compose.yml
version: '3.8'
services:
web:
build: .
ports:
- "8000:8000"
- "8765:8765"
depends_on:
- db
- redis
environment:
- DATABASE_URL=postgresql://user:password@db:5432/quality_db
- REDIS_URL=redis://redis:6379/0
volumes:
- ./models:/app/models
- ./logs:/app/logs
restart: unless-stopped
db:
image: postgres:13
environment:
- POSTGRES_DB=quality_db
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
redis:
image: redis:6-alpine
volumes:
- redis_data:/data
restart: unless-stopped
volumes:
postgres_data:
redis_data:
监控与日志
# monitoring.py
import logging
from prometheus_client import start_http_server, Counter, Gauge, Histogram
import time
from typing import Dict, Any
# 定义监控指标
REQUESTS_COUNTER = Counter('http_requests_total', 'Total HTTP Requests', ['method', 'endpoint', 'status'])
REQUEST_DURATION = Histogram('http_request_duration_seconds', 'HTTP request duration', ['endpoint'])
ACTIVE_USERS = Gauge('active_users', 'Number of active users')
RISK_PREDICTIONS = Counter('risk_predictions_total', 'Total risk predictions', ['risk_level'])
DATA_COLLECTION_ERRORS = Counter('data_collection_errors_total', 'Data collection errors')
class MonitoringMiddleware:
"""监控中间件"""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope['type'] != 'http':
return await self.app(scope, receive, send)
start_time = time.time()
method = scope['method']
endpoint = scope['path']
# 包装send函数以捕获响应状态
async def wrapped_send(response):
if response['type'] == 'http.response.start':
status = response['status']
REQUESTS_COUNTER.labels(method=method, endpoint=endpoint, status=status).inc()
await send(response)
try:
await self.app(scope, receive, wrapped_send)
finally:
duration = time.time() - start_time
REQUEST_DURATION.labels(endpoint=endpoint).observe(duration)
def setup_monitoring(port: int = 9090):
"""设置监控"""
start_http_server(port)
logging.info(f"Monitoring server started on port {port}")
def track_risk_prediction(risk_level: str):
"""跟踪风险预测"""
RISK_PREDICTIONS.labels(risk_level=risk_level).inc()
def track_data_collection_error():
"""跟踪数据收集错误"""
DATA_COLLECTION_ERRORS.inc()
def update_active_users(count: int):
"""更新活跃用户数"""
ACTIVE_USERS.set(count)
总结
本系统实现了一个完整的工程建设行业AI质量风险预测与决策支持系统,具备以下特点:
- 全面的数据采集与整合:支持多源数据采集和智能数据融合
- 先进的风险预测能力:采用机器学习和异常检测技术识别质量风险
- 智能决策支持:基于知识库和规则引擎生成可落地的优化方案
- 标准化数据管理:确保数据质量和一致性
- 丰富的可视化分析:提供多维度、交互式的数据可视化
- 跨平台同步:支持多终端实时数据同步和协作
系统采用微服务架构,具有良好的可扩展性和维护性,能够满足大型工程建设项目的质量管理需求。通过AI技术的应用,系统能够提前识别潜在风险,帮助工程团队采取预防措施,提高工程质量,降低风险成本。
实际部署时,需要根据具体项目需求调整模型参数、数据标准和业务规则,并与现有的工程管理系统进行集成。同时,应建立持续模型训练和系统优化机制,确保系统能够适应不断变化的工程环境和质量要求。