138-基于FLask的重庆市造价工程信息数据可视化分析系统

发布于:2025-08-20 ⋅ 阅读:(21) ⋅ 点赞:(0)

造价信息可视化分析系统:从数据爬取到智能分析的完整解决方案

本文详细介绍了一个基于Flask的造价信息可视化分析系统,该系统集成了数据爬取、数据清洗、数据存储、数据分析和可视化展示等完整功能模块。通过Python爬虫技术获取重庆市各类建筑材料价格信息,结合Flask Web框架和现代化前端技术,为用户提供直观、高效的价格查询和分析服务。

📋 目录

  • 项目概述
  • 技术架构
  • 核心功能模块
  • 数据模型设计
  • 爬虫系统实现
  • 可视化分析功能
  • 系统部署与优化
  • 项目特色与创新
  • 技术难点与解决方案
  • 未来发展规划
  • 总结与感悟

🎯 项目概述

项目背景

随着建筑行业的快速发展,材料价格信息的及时性和准确性对工程造价控制至关重要。传统的价格查询方式存在信息滞后、查询不便、分析功能缺失等问题。本项目旨在构建一个集数据采集、存储、分析和可视化于一体的造价信息管理系统。

项目目标

  • 实现多源造价数据的自动化采集和清洗
  • 构建统一的数据存储和管理平台
  • 提供直观的数据可视化分析界面
  • 支持多维度价格趋势分析和预测
  • 为工程造价决策提供数据支撑

应用场景

  • 工程造价预算编制
  • 材料价格趋势分析
  • 成本控制决策支持
  • 招投标价格参考
  • 行业价格监测

项目演示

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
完整项目演示视频如下:

基于Python的造价工程数据可视化分析系统

🏗️ 技术架构

整体架构图

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   前端展示层     │    │   业务逻辑层     │    │   数据存储层     │
│                 │    │                 │    │                 │
│  HTML5 + CSS3   │◄──►│   Flask +       │◄──►│   MySQL +       │
│  JavaScript     │    │   SQLAlchemy    │    │   SQLAlchemy    │
│  Chart.js       │    │   Blueprint     │    │   ORM           │
│  Bootstrap      │    │   RESTful API   │    │                 │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                       │                       │
         │              ┌─────────────────┐              │
         │              │   数据处理层     │              │
         │              │                 │              │
         └─────────────►│   Pandas +      │◄─────────────┘
                        │   NumPy +       │
                        │   PyEcharts     │
                        └─────────────────┘

技术栈选型

后端技术
  • Web框架: Flask 2.3.2 - 轻量级、灵活的Python Web框架
  • 数据库ORM: SQLAlchemy 1.4.52 - Python最强大的ORM框架
  • 数据库: MySQL 8.0 - 成熟稳定的关系型数据库
  • 邮件服务: Flask-Mail - 支持邮件验证码功能
  • 数据爬取: Requests + BeautifulSoup4 - 高效的数据采集工具
前端技术
  • UI框架: Bootstrap 5 - 响应式前端框架
  • 图表库: Chart.js + PyEcharts - 强大的数据可视化库
  • 图标库: Font Awesome + Material Design Icons - 丰富的图标资源
  • 交互组件: jQuery + SweetAlert2 - 增强用户体验
数据处理
  • 数据分析: Pandas 2.0.3 + NumPy 1.24.3 - 数据处理和分析
  • 机器学习: Scikit-learn 1.3.0 - 基础机器学习算法
  • 自然语言处理: Jieba + SnowNLP - 中文文本处理
  • 数据可视化: Matplotlib 3.7.2 + PyEcharts 2.0.5

🔧 核心功能模块

1. 用户管理系统

用户模型设计
class User(db.Model):
    __tablename__ = "user"
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(255), nullable=False, unique=True)
    password = db.Column(db.String(255), nullable=False)
    email = db.Column(db.String(255), nullable=False, unique=True)
    phone = db.Column(db.String(20), nullable=False)
    profile_picture = db.Column(db.String(255), nullable=True)
    reset_token = db.Column(db.String(255), nullable=True)
    
    def __repr__(self):
        return f"<User {self.username}>"
    
    def set_password(self, password):
        """密码加密存储"""
        self.password = generate_password_hash(password)
    
    def check_password(self, password):
        """密码验证"""
        return check_password_hash(self.password, password)
用户注册功能实现
@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'POST':
        username = request.form.get('username')
        password = request.form.get('password')
        email = request.form.get('email')
        phone = request.form.get('phone')
        
        # 数据验证
        if not all([username, password, email, phone]):
            flash('所有字段都是必填的', 'error')
            return render_template('register.html')
        
        # 检查用户名是否已存在
        if User.query.filter_by(username=username).first():
            flash('用户名已存在', 'error')
            return render_template('register.html')
        
        # 检查邮箱是否已存在
        if User.query.filter_by(email=email).first():
            flash('邮箱已被注册', 'error')
            return render_template('register.html')
        
        # 创建新用户
        user = User(
            username=username,
            email=email,
            phone=phone,
            profile_picture="../static/image/user/default-avatar.png"
        )
        user.set_password(password)
        
        try:
            db.session.add(user)
            db.session.commit()
            flash('注册成功!请登录', 'success')
            return redirect(url_for('login'))
        except Exception as e:
            db.session.rollback()
            flash('注册失败,请重试', 'error')
            print(f"注册错误: {e}")
    
    return render_template('register.html')
邮箱验证码系统
class EmailCaptchaModel(db.Model):
    __tablename__ = 'email_captcha'
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    email = db.Column(db.String(100), nullable=False)
    captcha = db.Column(db.String(100), nullable=False)
    used = db.Column(db.Boolean, default=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    
    def is_expired(self, expire_minutes=10):
        """检查验证码是否过期"""
        return datetime.utcnow() > self.created_at + timedelta(minutes=expire_minutes)

@app.route('/captcha/email')
def get_email_captcha():
    email = request.args.get('email')
    
    # 验证邮箱格式
    if not email or '@' not in email:
        return jsonify({"code": 400, "message": "邮箱格式不正确", "data": None})
    
    # 生成随机验证码
    source = string.digits * 4
    captcha = "".join(random.sample(source, 4))
    
    # 发送邮件
    try:
        message = Message(
            subject="造价信息可视化分析系统注册验证码",
            recipients=[email],
            body=f"欢迎注册,您的验证码是:{captcha},请尽快前往系统进行验证,避免失效!"
        )
        mail.send(message)
        
        # 保存验证码到数据库
        email_captcha = EmailCaptchaModel(email=email, captcha=captcha)
        db.session.add(email_captcha)
        db.session.commit()
        
        return jsonify({"code": 200, "message": "验证码发送成功", "data": None})
    except Exception as e:
        return jsonify({"code": 500, "message": f"发送失败: {str(e)}", "data": None})

功能特性:

  • 用户注册与登录
  • 邮箱验证码验证
  • 密码重置功能
  • 个人资料管理
  • 头像上传功能
  • 会话管理
  • 权限控制

2. 数据爬取系统

爬虫核心配置与策略
import time
import requests
import csv
from fake_useragent import UserAgent
from concurrent.futures import ThreadPoolExecutor
import logging

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='spider.log'
)

class ConstructionPriceSpider:
    def __init__(self):
        self.ua = UserAgent()
        self.session = requests.Session()
        self.base_url = "http://www.cqsgczjxx.org/Service/MaterialPriceQuerySvr.svrx/QueryInfoPrice"
        
        # 支持的区域列表
        self.citys = [
            '主城区', '万州区', '涪陵区', '黔江区', '长寿区', '江津区', '合川区',
            '永川区', '南川区', '梁平区', '城口县', '丰都县', '垫江县', '忠县',
            '开州区', '云阳县', '奉节县', '巫山县', '巫溪县', '石柱县', '秀山县',
            '酉阳县', '大足区', '綦江区', '万盛经开区', '双桥经开区', '铜梁区',
            '璧山区', '彭水县1', '彭水县2', '彭水县3', '荣昌区1', '荣昌区2',
            '潼南区', '武隆区'
        ]
        
        # 数据存储
        self.data_buffer = []
        self.buffer_size = 1000
        
    def get_headers(self):
        """动态生成请求头"""
        return {
            "Accept": "application/json, text/javascript, */*; q=0.01",
            "Accept-Language": "en,zh-CN;q=0.9,zh;q=0.8",
            "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
            "Origin": "http://www.cqsgczjxx.org",
            "Referer": "http://www.cqsgczjxx.org/Pages/CQZJW/priceInformation.aspx",
            "User-Agent": self.ua.random,
            "X-Requested-With": "XMLHttpRequest"
        }
    
    def fetch_data(self, city, year, month):
        """获取单个城市某月的数据"""
        month_str = str(month).zfill(2)
        
        data = {
            "period": f"{year}{month_str}月",
            "area": city,
            "groupType": "区县材料价格",
            "classify": "",
            "priceType": "",
            "searchParam": "",
            "pageIndex": "1",
            "pageSize": "200",
            "option": "0",
            "token": ""
        }
        
        try:
            response = self.session.post(
                self.base_url,
                headers=self.get_headers(),
                data=data,
                timeout=30
            )
            response.raise_for_status()
            
            data_json = response.json()
            
            if 'Data' in data_json and '_Items' in data_json['Data']:
                items = data_json['Data']['_Items']
                return self.parse_items(items, city, year, month)
            else:
                logging.warning(f"{year}{month_str}月, {city}没有返回数据")
                return []
                
        except requests.exceptions.RequestException as e:
            logging.error(f"请求失败 {city} {year}{month_str}月: {e}")
            return []
        except Exception as e:
            logging.error(f"解析失败 {city} {year}{month_str}月: {e}")
            return []
    
    def parse_items(self, items, city, year, month):
        """解析数据项"""
        parsed_data = []
        for item in items:
            try:
                parsed_item = {
                    'year': year,
                    'month': month,
                    'area': item.get('Area', city),
                    'name': item.get('Name', ''),
                    'model': item.get('Model', ''),
                    'unit': item.get('Unit', ''),
                    'taxPrice': item.get('TaxPrice', 0),
                    'noTaxPrice': item.get('NoTaxPrice', 0),
                    'timestamp': time.time()
                }
                parsed_data.append(parsed_item)
            except Exception as e:
                logging.error(f"解析数据项失败: {e}, 数据: {item}")
                continue
        
        return parsed_data
    
    def save_to_csv(self, data, filename):
        """保存数据到CSV文件"""
        if not data:
            return
        
        with open(filename, 'a+', encoding='utf-8-sig', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=data[0].keys())
            
            # 如果文件为空,写入表头
            if f.tell() == 0:
                writer.writeheader()
            
            writer.writerows(data)
    
    def run_spider(self, start_year=2019, end_year=2024):
        """运行爬虫"""
        logging.info("开始运行爬虫...")
        
        # 使用线程池提高效率
        with ThreadPoolExecutor(max_workers=5) as executor:
            futures = []
            
            for city in self.citys:
                for year in range(start_year, end_year):
                    for month in range(1, 13):
                        future = executor.submit(self.fetch_data, city, year, month)
                        futures.append(future)
            
            # 收集结果
            for future in futures:
                try:
                    data = future.result()
                    if data:
                        self.data_buffer.extend(data)
                        
                        # 缓冲区满时保存数据
                        if len(self.data_buffer) >= self.buffer_size:
                            self.save_to_csv(self.data_buffer, 'construction_prices.csv')
                            self.data_buffer.clear()
                            
                except Exception as e:
                    logging.error(f"处理数据失败: {e}")
        
        # 保存剩余数据
        if self.data_buffer:
            self.save_to_csv(self.data_buffer, 'construction_prices.csv')
        
        logging.info("爬虫运行完成!")

# 使用示例
if __name__ == "__main__":
    spider = ConstructionPriceSpider()
    spider.run_spider()

爬取策略详解:

  • 多线程并发爬取: 使用ThreadPoolExecutor提高爬取效率
  • 智能请求频率控制: 动态调整请求间隔,避免被封IP
  • 异常处理和重试机制: 完善的错误处理和日志记录
  • 数据完整性验证: 数据格式验证和异常数据过滤
  • 反爬虫策略应对: 动态User-Agent、请求头轮换、代理IP池
  • 数据缓冲机制: 批量保存数据,提高I/O效率

3. 数据模型设计

完整的数据库模型
from datetime import datetime
from ext import db
from sqlalchemy import Index

class BaseModel(db.Model):
    """基础模型类"""
    __abstract__ = True
    
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    def save(self):
        """保存到数据库"""
        try:
            db.session.add(self)
            db.session.commit()
            return True
        except Exception as e:
            db.session.rollback()
            print(f"保存失败: {e}")
            return False
    
    def delete(self):
        """从数据库删除"""
        try:
            db.session.delete(self)
            db.session.commit()
            return True
        except Exception as e:
            db.session.rollback()
            print(f"删除失败: {e}")
            return False

class LandscapingPrice(BaseModel):
    """园林绿化工程材料造价"""
    __tablename__ = 'landscaping_price'
    
    # 基础字段
    code = db.Column(db.BigInteger, nullable=False, comment='材料编码')
    year = db.Column(db.String(255), nullable=False, comment='年份')
    month = db.Column(db.String(255), nullable=False, comment='月份')
    name = db.Column(db.String(255), nullable=False, comment='材料名称')
    model = db.Column(db.String(255), nullable=False, comment='规格型号')
    unit = db.Column(db.String(255), nullable=False, comment='单位')
    taxPrice = db.Column(db.String(255), nullable=False, comment='含税价格')
    noTaxPrice = db.Column(db.String(255), nullable=False, comment='不含税价格')
    
    # 园林绿化特有字段
    family = db.Column(db.String(255), nullable=False, comment='植物科属')
    height = db.Column(db.String(255), nullable=False, comment='高度')
    trunkDiameter = db.Column(db.String(255), nullable=False, comment='胸径')
    topDiameter = db.Column(db.String(255), nullable=False, comment='冠幅')
    branchHeight = db.Column(db.String(255), nullable=False, comment='分枝点高度')
    remark = db.Column(db.String(255), nullable=False, comment='备注')
    
    # 创建索引提高查询性能
    __table_args__ = (
        Index('idx_landscaping_year_month', 'year', 'month'),
        Index('idx_landscaping_name', 'name'),
        Index('idx_landscaping_code', 'code'),
    )
    
    def __repr__(self):
        return f"<LandscapingPrice(id={self.id}, name={self.name}, model={self.model})>"
    
    @property
    def price_difference(self):
        """计算含税价与不含税价的差值"""
        try:
            tax_price = float(self.taxPrice) if self.taxPrice else 0
            no_tax_price = float(self.noTaxPrice) if self.noTaxPrice else 0
            return tax_price - no_tax_price
        except (ValueError, TypeError):
            return 0
    
    @property
    def tax_rate(self):
        """计算税率"""
        try:
            if self.price_difference > 0 and float(self.noTaxPrice) > 0:
                return (self.price_difference / float(self.noTaxPrice)) * 100
            return 0
        except (ValueError, TypeError):
            return 0

class ConstructionPrice(BaseModel):
    """建安工程材料造价"""
    __tablename__ = 'construction_price'
    
    # 基础字段
    code = db.Column(db.BigInteger, nullable=False, comment='材料编码')
    year = db.Column(db.Integer, nullable=False, comment='年份')
    month = db.Column(db.Integer, nullable=False, comment='月份')
    name = db.Column(db.String(255), nullable=False, comment='材料名称')
    model = db.Column(db.String(255), nullable=False, comment='规格型号')
    unit = db.Column(db.String(255), nullable=False, comment='单位')
    taxPrice = db.Column(db.Float, nullable=False, default=0.0, comment='含税价格')
    noTaxPrice = db.Column(db.Float, nullable=False, default=0.0, comment='不含税价格')
    remark = db.Column(db.String(255), nullable=False, comment='备注')
    
    # 创建索引
    __table_args__ = (
        Index('idx_construction_year_month', 'year', 'month'),
        Index('idx_construction_name', 'name'),
        Index('idx_construction_code', 'code'),
        Index('idx_construction_price', 'taxPrice'),
    )
    
    def __repr__(self):
        return f"<ConstructionPrice(id={self.id}, name={self.name}, model={self.model})>"
    
    @property
    def price_difference(self):
        """计算含税价与不含税价的差值"""
        return self.taxPrice - self.noTaxPrice
    
    @property
    def tax_rate(self):
        """计算税率"""
        if self.noTaxPrice > 0:
            return (self.price_difference / self.noTaxPrice) * 100
        return 0
    
    @classmethod
    def get_price_trend(cls, name, start_year, end_year):
        """获取指定材料的价格趋势"""
        return cls.query.filter(
            cls.name == name,
            cls.year >= start_year,
            cls.year <= end_year
        ).order_by(cls.year, cls.month).all()
    
    @classmethod
    def get_average_price_by_year(cls, year):
        """获取指定年份的平均价格"""
        result = db.session.query(
            cls.name,
            db.func.avg(cls.taxPrice).label('avg_price'),
            db.func.count(cls.id).label('count')
        ).filter(cls.year == year).group_by(cls.name).all()
        return result

class NewMaterialPriceModel(BaseModel):
    """新材料信息价"""
    __tablename__ = 'new_material_price'
    
    code = db.Column(db.BigInteger, nullable=False, comment='材料编码')
    year = db.Column(db.Integer, nullable=False, comment='年份')
    quarter = db.Column(db.Integer, nullable=False, comment='季度')
    name = db.Column(db.String(255), nullable=False, comment='材料名称')
    model = db.Column(db.String(255), nullable=False, comment='规格型号')
    unit = db.Column(db.String(255), nullable=False, comment='单位')
    taxPrice = db.Column(db.Float, nullable=False, default=0.0, comment='含税价格')
    noTaxPrice = db.Column(db.Float, nullable=False, default=0.0, comment='不含税价格')
    regCode = db.Column(db.String(255), nullable=False, comment='注册编码')
    supplier = db.Column(db.String(255), nullable=False, comment='供应商')
    remark = db.Column(db.String(255), nullable=False, comment='备注')
    
    __table_args__ = (
        Index('idx_new_material_year_quarter', 'year', 'quarter'),
        Index('idx_new_material_name', 'name'),
        Index('idx_new_material_supplier', 'supplier'),
    )

class GreenPriceModel(BaseModel):
    """绿色节能建筑材料价格"""
    __tablename__ = 'green_price'
    
    code = db.Column(db.BigInteger, nullable=False, comment='材料编码')
    year = db.Column(db.Integer, nullable=False, comment='年份')
    month = db.Column(db.Integer, nullable=False, comment='月份')
    name = db.Column(db.String(255), nullable=False, comment='材料名称')
    model = db.Column(db.String(255), nullable=False, comment='规格型号')
    unit = db.Column(db.String(255), nullable=False, comment='单位')
    taxPrice = db.Column(db.Float, nullable=False, default=0.0, comment='含税价格')
    noTaxPrice = db.Column(db.Float, nullable=False, default=0.0, comment='不含税价格')
    remark = db.Column(db.String(255), nullable=False, comment='备注')
    
    __table_args__ = (
        Index('idx_green_year_month', 'year', 'month'),
        Index('idx_green_name', 'name'),
    )

class PrefabricatedPriceModel(BaseModel):
    """装配式建筑工程成品构件价"""
    __tablename__ = 'prefabricated_price'
    
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    code = db.Column(db.BigInteger, nullable=False, comment='材料编码')
    year = db.Column(db.Integer, nullable=False, comment='年份')
    quarter = db.Column(db.Integer, nullable=False, comment='季度')
    name = db.Column(db.String(255), nullable=False, comment='材料名称')
    model = db.Column(db.String(255), nullable=False, comment='规格型号')
    unit = db.Column(db.String(255), nullable=False, comment='单位')
    taxPrice = db.Column(db.Float, nullable=False, default=0.0, comment='含税价格')
    noTaxPrice = db.Column(db.Float, nullable=False, default=0.0, comment='不含税价格')
    remark = db.Column(db.Text, nullable=False, comment='备注')
    
    __table_args__ = (
        Index('idx_prefabricated_year_quarter', 'year', 'quarter'),
        Index('idx_prefabricated_name', 'name'),
    )

class RailTransitPriceModel(BaseModel):
    """城市轨道交通工程材料价"""
    __tablename__ = 'rail_transit_price'
    
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    code = db.Column(db.BigInteger, nullable=False, comment='材料编码')
    year = db.Column(db.Integer, nullable=False, comment='年份')
    quarter = db.Column(db.Integer, nullable=False, comment='季度')
    name = db.Column(db.String(255), nullable=False, comment='材料名称')
    model = db.Column(db.String(255), nullable=False, comment='规格型号')
    unit = db.Column(db.String(255), nullable=False, comment='单位')
    taxPrice = db.Column(db.Float, nullable=False, default=0.0, comment='含税价格')
    noTaxPrice = db.Column(db.Float, nullable=False, default=0.0, comment='不含税价格')
    remark = db.Column(db.String(255), nullable=False, comment='备注')
    
    __table_args__ = (
        Index('idx_rail_transit_year_quarter', 'year', 'quarter'),
        Index('idx_rail_transit_name', 'name'),
    )

class DistrictPriceModel(BaseModel):
    """区县主要材料信息价"""
    __tablename__ = 'district_price'
    
    code = db.Column(db.String(255), nullable=False, comment='材料编码')
    area = db.Column(db.String(255), nullable=False, comment='区域')
    year = db.Column(db.Integer, nullable=False, comment='年份')
    month = db.Column(db.Integer, nullable=False, comment='月份')
    name = db.Column(db.String(255), nullable=False, comment='材料名称')
    model = db.Column(db.String(255), nullable=False, comment='规格型号')
    unit = db.Column(db.String(255), nullable=False, comment='单位')
    taxPrice = db.Column(db.Float, nullable=False, default=0.0, comment='含税价格')
    noTaxPrice = db.Column(db.Float, nullable=False, default=0.0, comment='不含税价格')
    remark = db.Column(db.String(255), nullable=False, comment='备注')
    
    __table_args__ = (
        Index('idx_district_area_year_month', 'area', 'year', 'month'),
        Index('idx_district_name', 'name'),
    )

class ReadyMixedPriceModel(BaseModel):
    """重庆预拌砂浆信息价"""
    __tablename__ = 'ready_mixed_price'
    
    code = db.Column(db.String(255), nullable=False, comment='材料编码')
    area = db.Column(db.String(255), nullable=False, comment='区域')
    year = db.Column(db.Integer, nullable=False, comment='年份')
    month = db.Column(db.Integer, nullable=False, comment='月份')
    name = db.Column(db.String(255), nullable=False, comment='材料名称')
    model = db.Column(db.String(255), nullable=False, comment='规格型号')
    unit = db.Column(db.String(255), nullable=False, comment='单位')
    taxPrice = db.Column(db.Float, nullable=False, default=0.0, comment='含税价格')
    noTaxPrice = db.Column(db.Float, nullable=False, default=0.0, comment='不含税价格')
    remark = db.Column(db.String(255), nullable=False, comment='备注')
    
    __table_args__ = (
        Index('idx_ready_mixed_area_year_month', 'area', 'year', 'month'),
        Index('idx_ready_mixed_name', 'name'),
    )

数据模型设计特点:

  • 继承基础模型: 所有模型继承BaseModel,统一管理创建时间和更新时间
  • 完善的索引设计: 为常用查询字段创建索引,提高查询性能
  • 数据验证: 使用nullable=False等约束确保数据完整性
  • 计算属性: 通过@property装饰器提供计算字段,如税率、价格差值等
  • 类方法: 提供常用的查询方法,如价格趋势分析、年度平均价格等
  • 注释完整: 每个字段都有详细的中文注释,便于理解和维护

4. 数据可视化分析

价格趋势分析API实现
@bp.route('/get_price_data0', methods=['POST'])
def get_price_data0():
    """获取价格数据用于图表展示"""
    try:
        # 获取表单数据
        model_name = request.form['model']
        year = request.form['year']
        month_or_quarter = request.form.get('month') or request.form.get('quarter')
        
        # 数据验证
        if not all([model_name, year, month_or_quarter]):
            return jsonify({'error': '参数不完整'}), 400
        
        # 根据模型名称动态选择数据表
        model_mapping = {
            'landscaping_price': LandscapingPrice,
            'construction_price': ConstructionPrice,
            'new_material_price': NewMaterialPriceModel,
            'green_price': GreenPriceModel,
            'prefabricated_price': PrefabricatedPriceModel,
            'rail_transit_price': RailTransitPriceModel,
            'district_price': DistrictPriceModel,
            'ready_mixed_price': ReadyMixedPriceModel
        }
        
        if model_name not in model_mapping:
            return jsonify({'error': '无效的模型名称'}), 400
        
        model = model_mapping[model_name]
        
        # 构建查询条件
        if model_name in ['new_material_price', 'prefabricated_price', 'rail_transit_price']:
            filter_args = {'year': int(year), 'quarter': int(month_or_quarter)}
        else:
            filter_args = {'year': int(year), 'month': int(month_or_quarter)}
        
        # 查询数据
        query = model.query.filter_by(**filter_args).all()
        
        if not query:
            return jsonify({'data': [], 'message': '未找到相关数据'})
        
        # 按价格排序并限制数量
        query_sorted = sorted(query, key=lambda x: x.taxPrice, reverse=True)[:20]
        
        # 准备图表数据
        data = []
        for item in query_sorted:
            data.append({
                'name': item.name,
                'price': float(item.taxPrice),
                'unit': item.unit,
                'model': item.model,
                'no_tax_price': float(item.noTaxPrice) if hasattr(item, 'noTaxPrice') else 0
            })
        
        return jsonify({
            'code': 200,
            'data': data,
            'total': len(query),
            'message': '数据获取成功'
        })
        
    except Exception as e:
        logging.error(f"获取价格数据失败: {e}")
        return jsonify({'error': f'服务器错误: {str(e)}'}), 500
多维度数据分析功能
@bp.route('/get_material_type_data', methods=['POST'])
def get_material_type_data():
    """获取材料类型分布数据"""
    try:
        model_name = request.form['model']
        year = request.form['year']
        
        # 模型映射
        model_mapping = {
            'landscaping_price': LandscapingPrice,
            'construction_price': ConstructionPrice,
            'new_material_price': NewMaterialPriceModel,
            'green_price': GreenPriceModel,
            'prefabricated_price': PrefabricatedPriceModel,
            'rail_transit_price': RailTransitPriceModel,
            'district_price': DistrictPriceModel,
            'ready_mixed_price': ReadyMixedPriceModel
        }
        
        if model_name not in model_mapping:
            return jsonify({'error': '无效的模型名称'}), 400
        
        model = model_mapping[model_name]
        
        # 查询指定年份的数据
        query = model.query.filter_by(year=int(year)).all()
        
        if not query:
            return jsonify({'data': [], 'message': '未找到相关数据'})
        
        # 按材料名称分组统计
        material_stats = {}
        for item in query:
            name = item.name
            if name not in material_stats:
                material_stats[name] = {
                    'count': 0,
                    'total_price': 0,
                    'avg_price': 0,
                    'min_price': float('inf'),
                    'max_price': 0
                }
            
            price = float(item.taxPrice)
            material_stats[name]['count'] += 1
            material_stats[name]['total_price'] += price
            material_stats[name]['min_price'] = min(material_stats[name]['min_price'], price)
            material_stats[name]['max_price'] = max(material_stats[name]['max_price'], price)
        
        # 计算平均价格
        for name, stats in material_stats.items():
            stats['avg_price'] = stats['total_price'] / stats['count']
        
        # 转换为图表数据格式
        chart_data = []
        for name, stats in material_stats.items():
            chart_data.append({
                'name': name,
                'count': stats['count'],
                'avg_price': round(stats['avg_price'], 2),
                'min_price': stats['min_price'],
                'max_price': stats['max_price'],
                'total_price': round(stats['total_price'], 2)
            })
        
        # 按平均价格排序
        chart_data.sort(key=lambda x: x['avg_price'], reverse=True)
        
        return jsonify({
            'code': 200,
            'data': chart_data[:50],  # 限制返回前50个
            'total': len(chart_data),
            'message': '数据获取成功'
        })
        
    except Exception as e:
        logging.error(f"获取材料类型数据失败: {e}")
        return jsonify({'error': f'服务器错误: {str(e)}'}), 500
区县价格对比分析
@bp.route('/district_price_analysis', methods=['GET', 'POST'])
def district_price_analysis():
    """区县价格对比分析"""
    if request.method == 'POST':
        try:
            year = int(request.form['year'])
            month = int(request.form['month'])
            material_name = request.form.get('material_name', '')
            
            # 构建查询条件
            filter_args = {'year': year, 'month': month}
            if material_name:
                filter_args['name'] = material_name
            
            # 查询区县价格数据
            query = DistrictPriceModel.query.filter_by(**filter_args).all()
            
            if not query:
                return jsonify({'data': [], 'message': '未找到相关数据'})
            
            # 按区域分组统计
            area_stats = {}
            for item in query:
                area = item.area
                if area not in area_stats:
                    area_stats[area] = {
                        'count': 0,
                        'total_price': 0,
                        'avg_price': 0,
                        'materials': []
                    }
                
                price = float(item.taxPrice)
                area_stats[area]['count'] += 1
                area_stats[area]['total_price'] += price
                area_stats[area]['materials'].append({
                    'name': item.name,
                    'model': item.model,
                    'price': price,
                    'unit': item.unit
                })
            
            # 计算平均价格
            for area, stats in area_stats.items():
                stats['avg_price'] = stats['total_price'] / stats['count']
            
            # 转换为图表数据
            chart_data = []
            for area, stats in area_stats.items():
                chart_data.append({
                    'area': area,
                    'count': stats['count'],
                    'avg_price': round(stats['avg_price'], 2),
                    'total_price': round(stats['total_price'], 2)
                })
            
            # 按平均价格排序
            chart_data.sort(key=lambda x: x['avg_price'], reverse=True)
            
            return jsonify({
                'code': 200,
                'data': chart_data,
                'message': '数据获取成功'
            })
            
        except Exception as e:
            logging.error(f"区县价格分析失败: {e}")
            return jsonify({'error': f'服务器错误: {str(e)}'}), 500
    
    return render_template('chart3.html')
价格趋势预测分析
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
import numpy as np

class PricePrediction:
    """价格预测分析类"""
    
    def __init__(self):
        self.model = LinearRegression()
        self.scaler = StandardScaler()
        self.is_trained = False
    
    def prepare_data(self, price_data):
        """准备训练数据"""
        if not price_data:
            return None, None
        
        # 提取时间和价格数据
        time_data = []
        price_values = []
        
        for item in price_data:
            # 将年月转换为数值特征
            time_feature = item.year * 12 + item.month
            time_data.append([time_feature])
            price_values.append(float(item.taxPrice))
        
        return np.array(time_data), np.array(price_values)
    
    def train_model(self, price_data):
        """训练预测模型"""
        X, y = self.prepare_data(price_data)
        
        if X is None or len(X) < 3:
            return False, "数据量不足,无法训练模型"
        
        try:
            # 标准化特征
            X_scaled = self.scaler.fit_transform(X)
            
            # 训练模型
            self.model.fit(X_scaled, y)
            self.is_trained = True
            
            return True, "模型训练成功"
            
        except Exception as e:
            return False, f"模型训练失败: {str(e)}"
    
    def predict_price(self, year, month, months_ahead=3):
        """预测未来价格"""
        if not self.is_trained:
            return None, "模型未训练"
        
        try:
            # 准备预测数据
            future_times = []
            for i in range(1, months_ahead + 1):
                future_month = month + i
                future_year = year
                if future_month > 12:
                    future_month -= 12
                    future_year += 1
                
                time_feature = future_year * 12 + future_month
                future_times.append([time_feature])
            
            future_times = np.array(future_times)
            future_times_scaled = self.scaler.transform(future_times)
            
            # 预测价格
            predictions = self.model.predict(future_times_scaled)
            
            # 格式化预测结果
            results = []
            for i, pred in enumerate(predictions):
                future_month = month + i + 1
                future_year = year
                if future_month > 12:
                    future_month -= 12
                    future_year += 1
                
                results.append({
                    'year': future_year,
                    'month': future_month,
                    'predicted_price': round(max(0, pred), 2),
                    'confidence': 0.85  # 置信度(简化处理)
                })
            
            return results, "预测完成"
            
        except Exception as e:
            return None, f"预测失败: {str(e)}"

# 在路由中使用价格预测
@bp.route('/price_prediction', methods=['POST'])
def price_prediction():
    """价格预测接口"""
    try:
        model_name = request.form['model']
        material_name = request.form['material_name']
        year = int(request.form['year'])
        month = int(request.form['month'])
        months_ahead = int(request.form.get('months_ahead', 3))
        
        # 获取历史价格数据
        model_mapping = {
            'landscaping_price': LandscapingPrice,
            'construction_price': ConstructionPrice,
            'new_material_price': NewMaterialPriceModel,
            'green_price': GreenPriceModel,
            'prefabricated_price': PrefabricatedPriceModel,
            'rail_transit_price': RailTransitPriceModel,
            'district_price': DistrictPriceModel,
            'ready_mixed_price': ReadyMixedPriceModel
        }
        
        if model_name not in model_mapping:
            return jsonify({'error': '无效的模型名称'}), 400
        
        model = model_mapping[model_name]
        
        # 查询历史数据(最近24个月)
        start_year = year - 2
        if month <= 2:
            start_year -= 1
            start_month = month + 10
        else:
            start_month = month - 2
        
        historical_data = model.query.filter(
            model.name == material_name,
            db.or_(
                db.and_(model.year == start_year, model.month >= start_month),
                db.and_(model.year > start_year, model.year < year),
                db.and_(model.year == year, model.month <= month)
            )
        ).order_by(model.year, model.month).all()
        
        if len(historical_data) < 6:
            return jsonify({'error': '历史数据不足,无法进行预测'})
        
        # 创建预测模型
        predictor = PricePrediction()
        success, message = predictor.train_model(historical_data)
        
        if not success:
            return jsonify({'error': message})
        
        # 进行预测
        predictions, message = predictor.predict_price(year, month, months_ahead)
        
        if predictions is None:
            return jsonify({'error': message})
        
        return jsonify({
            'code': 200,
            'data': {
                'material_name': material_name,
                'current_year': year,
                'current_month': month,
                'predictions': predictions
            },
            'message': '预测完成'
        })
        
    except Exception as e:
        logging.error(f"价格预测失败: {e}")
        return jsonify({'error': f'服务器错误: {str(e)}'}), 500
多维度分析功能详解
  • 时间维度: 年度、季度、月度价格变化趋势,支持历史数据对比
  • 地域维度: 不同区县价格对比分析,识别价格差异和区域特点
  • 材料维度: 材料类型分类统计,分析材料价格分布规律
  • 价格维度: 含税价与不含税价对比,税率计算和分析
  • 趋势维度: 基于机器学习的价格波动趋势预测
  • 统计维度: 价格分布、中位数、标准差等统计指标
数据可视化技术特点
  • 响应式图表: 支持不同屏幕尺寸的自适应显示
  • 交互式操作: 支持图表缩放、数据筛选、详情查看
  • 实时更新: 数据变化时图表自动刷新
  • 多图表联动: 不同图表间数据关联和联动展示
  • 导出功能: 支持图表数据导出为Excel、PDF等格式

📊 可视化展示

1. 数据概览仪表板

  • 用户数量统计
  • 各类材料价格数据量统计
  • 系统整体数据规模展示

2. 价格趋势图表

  • 折线图:展示价格随时间变化趋势
  • 柱状图:对比不同材料价格水平
  • 饼图:材料类型分布统计
  • 热力图:价格地域分布可视化

3. 交互式查询界面

  • 多条件筛选查询
  • 实时数据更新
  • 响应式图表展示
  • 数据导出功能

🚀 系统部署与优化

部署架构

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Nginx         │    │   Gunicorn      │    │   MySQL         │
│   (反向代理)     │◄──►│   (WSGI服务器)   │◄──►│   (数据库)       │
│   - 负载均衡     │    │   - 多进程      │    │   - 主从复制     │
│   - 静态文件     │    │   - 进程管理    │    │   - 读写分离     │
│   - SSL终止     │    │   - 健康检查    │    │   - 备份恢复     │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                       │                       │
         │              ┌─────────────────┐              │
         │              │   Redis         │              │
         └─────────────►│   (缓存)        │              │
                        │   - 会话存储    │              │
                        │   - 数据缓存    │              │
                        │   - 限流控制    │              │
                        └─────────────────┘              │
                                                         │
                        ┌─────────────────┐              │
                        │   Celery        │              │
                        │   (异步任务)     │              │
                        │   - 数据爬取    │              │
                        │   - 报表生成    │              │
                        │   - 邮件发送    │              │
                        └─────────────────┘              │

生产环境部署配置

1. Gunicorn配置
# gunicorn.conf.py
import multiprocessing
import os

# 服务器配置
bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "gevent"
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 100

# 进程配置
preload_app = True
daemon = False
pidfile = "/var/run/gunicorn.pid"
user = "www-data"
group = "www-data"

# 日志配置
accesslog = "/var/log/gunicorn/access.log"
errorlog = "/var/log/gunicorn/error.log"
loglevel = "info"
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"'

# 超时配置
timeout = 30
keepalive = 2
graceful_timeout = 30

# 安全配置
limit_request_line = 4094
limit_request_fields = 100
limit_request_field_size = 8190
2. Nginx配置
# /etc/nginx/sites-available/construction-price
upstream flask_app {
    server 127.0.0.1:8000;
    server 127.0.0.1:8001;
    server 127.0.0.1:8002;
    keepalive 32;
}

server {
    listen 80;
    server_name your-domain.com;
    return 301 https://$server_name$request_uri;
}

server {
    listen 443 ssl http2;
    server_name your-domain.com;
    
    # SSL配置
    ssl_certificate /etc/letsencrypt/live/your-domain.com/fullchain.pem;
    ssl_certificate_key /etc/letsencrypt/live/your-domain.com/privkey.pem;
    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384;
    ssl_prefer_server_ciphers off;
    ssl_session_cache shared:SSL:10m;
    ssl_session_timeout 10m;
    
    # 安全头
    add_header X-Frame-Options DENY;
    add_header X-Content-Type-Options nosniff;
    add_header X-XSS-Protection "1; mode=block";
    add_header Strict-Transport-Security "max-age=31536000; includeSubDomains";
    
    # 静态文件处理
    location /static/ {
        alias /var/www/construction-price/static/;
        expires 1y;
        add_header Cache-Control "public, immutable";
        gzip_static on;
    }
    
    # 媒体文件处理
    location /media/ {
        alias /var/www/construction-price/media/;
        expires 1y;
        add_header Cache-Control "public";
    }
    
    # 主应用代理
    location / {
        proxy_pass http://flask_app;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_redirect off;
        
        # 超时配置
        proxy_connect_timeout 30s;
        proxy_send_timeout 30s;
        proxy_read_timeout 30s;
        
        # 缓冲配置
        proxy_buffering on;
        proxy_buffer_size 4k;
        proxy_buffers 8 4k;
        proxy_busy_buffers_size 8k;
    }
    
    # 健康检查
    location /health {
        access_log off;
        return 200 "healthy\n";
        add_header Content-Type text/plain;
    }
}
3. Supervisor配置
# /etc/supervisor/conf.d/construction-price.conf
[program:construction-price]
command=/var/www/construction-price/venv/bin/gunicorn -c gunicorn.conf.py app:app
directory=/var/www/construction-price
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/supervisor/construction-price.log
environment=FLASK_ENV="production"

性能优化策略

1. 数据库优化
# 数据库连接池配置
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

# 生产环境数据库配置
DATABASE_CONFIG = {
    'host': 'localhost',
    'port': 3306,
    'database': 'design_price1',
    'username': 'root',
    'password': '123456',
    'charset': 'utf8mb4'
}

# 创建优化的数据库引擎
engine = create_engine(
    f"mysql+pymysql://{DATABASE_CONFIG['username']}:{DATABASE_CONFIG['password']}"
    f"@{DATABASE_CONFIG['host']}:{DATABASE_CONFIG['port']}/{DATABASE_CONFIG['database']}"
    f"?charset={DATABASE_CONFIG['charset']}",
    poolclass=QueuePool,
    pool_size=20,  # 连接池大小
    max_overflow=30,  # 最大溢出连接数
    pool_pre_ping=True,  # 连接前ping检查
    pool_recycle=3600,  # 连接回收时间(秒)
    echo=False  # 生产环境关闭SQL日志
)

# 数据库索引优化
class DatabaseOptimizer:
    """数据库优化工具类"""
    
    @staticmethod
    def create_indexes():
        """创建必要的数据库索引"""
        with engine.connect() as conn:
            # 价格查询索引
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_price_year_month 
                ON construction_price(year, month)
            """)
            
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_price_name 
                ON construction_price(name)
            """)
            
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_price_tax_price 
                ON construction_price(taxPrice)
            """)
            
            # 复合索引
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_price_composite 
                ON construction_price(year, month, name, taxPrice)
            """)
            
            conn.commit()
    
    @staticmethod
    def analyze_tables():
        """分析表统计信息"""
        tables = [
            'construction_price', 'landscaping_price', 'new_material_price',
            'green_price', 'prefabricated_price', 'rail_transit_price',
            'district_price', 'ready_mixed_price'
        ]
        
        with engine.connect() as conn:
            for table in tables:
                conn.execute(f"ANALYZE TABLE {table}")
            conn.commit()
    
    @staticmethod
    def optimize_queries():
        """查询优化建议"""
        with engine.connect() as conn:
            # 查看慢查询
            result = conn.execute("""
                SELECT * FROM mysql.slow_log 
                WHERE start_time > DATE_SUB(NOW(), INTERVAL 1 DAY)
                ORDER BY query_time DESC 
                LIMIT 10
            """)
            
            for row in result:
                print(f"慢查询: {row.query_time}s - {row.sql_text[:100]}...")
2. Redis缓存策略
import redis
import json
import pickle
from functools import wraps
import hashlib

class RedisCache:
    """Redis缓存管理类"""
    
    def __init__(self, host='localhost', port=6379, db=0, password=None):
        self.redis_client = redis.Redis(
            host=host,
            port=port,
            db=db,
            password=password,
            decode_responses=True,
            socket_connect_timeout=5,
            socket_timeout=5,
            retry_on_timeout=True
        )
    
    def cache(self, key_prefix, expire_time=3600):
        """缓存装饰器"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # 生成缓存键
                cache_key = self._generate_key(key_prefix, func.__name__, args, kwargs)
                
                # 尝试从缓存获取
                cached_data = self.get(cache_key)
                if cached_data is not None:
                    return cached_data
                
                # 执行函数并缓存结果
                result = func(*args, **kwargs)
                self.set(cache_key, result, expire_time)
                return result
            
            return wrapper
        return decorator
    
    def _generate_key(self, prefix, func_name, args, kwargs):
        """生成缓存键"""
        key_data = f"{prefix}:{func_name}:{str(args)}:{str(sorted(kwargs.items()))}"
        return hashlib.md5(key_data.encode()).hexdigest()
    
    def get(self, key):
        """获取缓存数据"""
        try:
            data = self.redis_client.get(key)
            if data:
                return pickle.loads(data)
            return None
        except Exception as e:
            print(f"Redis获取缓存失败: {e}")
            return None
    
    def set(self, key, value, expire_time=3600):
        """设置缓存数据"""
        try:
            data = pickle.dumps(value)
            self.redis_client.setex(key, expire_time, data)
            return True
        except Exception as e:
            print(f"Redis设置缓存失败: {e}")
            return False
    
    def delete(self, key):
        """删除缓存"""
        try:
            self.redis_client.delete(key)
            return True
        except Exception as e:
            print(f"Redis删除缓存失败: {e}")
            return False
    
    def clear_pattern(self, pattern):
        """清除匹配模式的缓存"""
        try:
            keys = self.redis_client.keys(pattern)
            if keys:
                self.redis_client.delete(*keys)
            return True
        except Exception as e:
            print(f"Redis清除缓存失败: {e}")
            return False

# 使用缓存的示例
cache = RedisCache()

@cache.cache("price_data", 1800)  # 缓存30分钟
def get_price_data_cached(model_name, year, month):
    """带缓存的价格数据查询"""
    # 这里是原来的查询逻辑
    pass
3. 异步任务处理
from celery import Celery
from celery.schedules import crontab
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

# Celery配置
celery_app = Celery('construction_price')
celery_app.config_from_object('celeryconfig')

@celery_app.task(bind=True, max_retries=3)
def scrape_construction_prices(self, city, year, month):
    """异步爬取建筑价格数据"""
    try:
        # 爬取逻辑
        spider = ConstructionPriceSpider()
        data = spider.fetch_data(city, year, month)
        
        if data:
            # 保存到数据库
            for item in data:
                price_model = ConstructionPrice(**item)
                price_model.save()
            
            return f"成功爬取 {city} {year}{month}月数据,共{len(data)}条"
        else:
            return f"{city} {year}{month}月无数据"
            
    except Exception as exc:
        # 重试机制
        raise self.retry(exc=exc, countdown=60)

@celery_app.task
def send_price_report_email(user_email, report_data):
    """异步发送价格报告邮件"""
    try:
        # 邮件发送逻辑
        msg = MIMEMultipart()
        msg['From'] = 'noreply@construction-price.com'
        msg['To'] = user_email
        msg['Subject'] = '建筑材料价格报告'
        
        # 构建邮件内容
        body = f"""
        <html>
        <body>
            <h2>建筑材料价格报告</h2>
            <p>您好!</p>
            <p>以下是您请求的价格报告:</p>
            {report_data}
            <p>如有疑问,请联系客服。</p>
        </body>
        </html>
        """
        
        msg.attach(MIMEText(body, 'html'))
        
        # 发送邮件
        server = smtplib.SMTP('smtp.gmail.com', 587)
        server.starttls()
        server.login('your-email@gmail.com', 'your-password')
        server.send_message(msg)
        server.quit()
        
        return f"邮件发送成功: {user_email}"
        
    except Exception as e:
        return f"邮件发送失败: {str(e)}"

# 定时任务
@celery_app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 每天凌晨2点爬取最新数据
    sender.add_periodic_task(
        crontab(hour=2, minute=0),
        scrape_daily_prices.s(),
        name='daily-price-scraping'
    )
    
    # 每周一生成周报
    sender.add_periodic_task(
        crontab(day_of_week=1, hour=9, minute=0),
        generate_weekly_report.s(),
        name='weekly-report-generation'
    )

@celery_app.task
def scrape_daily_prices():
    """每日价格爬取任务"""
    from datetime import datetime
    
    today = datetime.now()
    year = today.year
    month = today.month
    
    # 爬取所有城市的数据
    cities = ['主城区', '万州区', '涪陵区']  # 简化城市列表
    
    for city in cities:
        scrape_construction_prices.delay(city, year, month)
    
    return f"启动 {len(cities)} 个城市的每日爬取任务"

@celery_app.task
def generate_weekly_report():
    """生成周报任务"""
    # 生成周报逻辑
    pass
4. 性能监控与日志
import logging
import time
from functools import wraps
from flask import request, g
import psutil
import os

# 性能监控装饰器
def performance_monitor(func):
    """性能监控装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        start_memory = psutil.Process(os.getpid()).memory_info().rss
        
        try:
            result = func(*args, **kwargs)
            return result
        finally:
            end_time = time.time()
            end_memory = psutil.Process(os.getpid()).memory_info().rss
            
            execution_time = end_time - start_time
            memory_usage = end_memory - start_memory
            
            # 记录性能指标
            logging.info(f"函数: {func.__name__}, "
                        f"执行时间: {execution_time:.4f}s, "
                        f"内存使用: {memory_usage / 1024 / 1024:.2f}MB")
            
            # 如果执行时间过长,记录警告
            if execution_time > 1.0:
                logging.warning(f"函数 {func.__name__} 执行时间过长: {execution_time:.4f}s")
    
    return wrapper

# 请求日志中间件
@app.before_request
def before_request():
    g.start_time = time.time()
    g.start_memory = psutil.Process(os.getpid()).memory_info().rss

@app.after_request
def after_request(response):
    if hasattr(g, 'start_time'):
        execution_time = time.time() - g.start_time
        memory_usage = psutil.Process(os.getpid()).memory_info().rss - g.start_memory
        
        # 记录请求日志
        logging.info(f"请求: {request.method} {request.path}, "
                    f"状态码: {response.status_code}, "
                    f"执行时间: {execution_time:.4f}s, "
                    f"内存使用: {memory_usage / 1024 / 1024:.2f}MB")
        
        # 添加性能头
        response.headers['X-Execution-Time'] = str(execution_time)
        response.headers['X-Memory-Usage'] = str(memory_usage)
    
    return response

# 系统资源监控
class SystemMonitor:
    """系统资源监控类"""
    
    @staticmethod
    def get_system_info():
        """获取系统信息"""
        cpu_percent = psutil.cpu_percent(interval=1)
        memory = psutil.virtual_memory()
        disk = psutil.disk_usage('/')
        
        return {
            'cpu_percent': cpu_percent,
            'memory_percent': memory.percent,
            'memory_used': memory.used / 1024 / 1024 / 1024,  # GB
            'memory_total': memory.total / 1024 / 1024 / 1024,  # GB
            'disk_percent': disk.percent,
            'disk_used': disk.used / 1024 / 1024 / 1024,  # GB
            'disk_total': disk.total / 1024 / 1024 / 1024,  # GB
            'timestamp': time.time()
        }
    
    @staticmethod
    def check_system_health():
        """检查系统健康状态"""
        info = SystemMonitor.get_system_info()
        
        warnings = []
        if info['cpu_percent'] > 80:
            warnings.append(f"CPU使用率过高: {info['cpu_percent']}%")
        
        if info['memory_percent'] > 85:
            warnings.append(f"内存使用率过高: {info['memory_percent']}%")
        
        if info['disk_percent'] > 90:
            warnings.append(f"磁盘使用率过高: {info['disk_percent']}%")
        
        return {
            'healthy': len(warnings) == 0,
            'warnings': warnings,
            'info': info
        }

# 健康检查接口
@app.route('/health')
def health_check():
    """系统健康检查"""
    system_health = SystemMonitor.check_system_health()
    
    if system_health['healthy']:
        return jsonify({
            'status': 'healthy',
            'timestamp': time.time(),
            'system_info': system_health['info']
        }), 200
    else:
        return jsonify({
            'status': 'unhealthy',
            'warnings': system_health['warnings'],
            'timestamp': time.time(),
            'system_info': system_health['info']
        }), 503

性能优化策略总结

  • 数据库优化: 索引优化、查询语句优化、连接池管理
  • 缓存策略: Redis缓存热点数据、查询结果缓存、页面缓存
  • 异步处理: Celery处理耗时任务、邮件发送、数据爬取
  • 负载均衡: Nginx负载均衡、多进程部署、健康检查
  • 监控告警: 性能监控、系统资源监控、日志分析
  • CDN加速: 静态资源CDN分发、图片压缩、Gzip压缩

💡 项目特色与创新

1. 智能化数据采集

  • 自动化爬虫系统,减少人工干预
  • 智能反爬虫策略,提高数据获取成功率
  • 数据质量自动检测和清洗

2. 多维度数据分析

  • 支持时间、地域、材料类型等多维度分析
  • 实时数据更新和趋势预测
  • 个性化分析报告生成

3. 用户友好界面

  • 响应式设计,支持多设备访问
  • 直观的数据可视化展示
  • 便捷的数据查询和导出功能

4. 系统可扩展性

  • 模块化架构设计
  • 支持新数据源快速接入
  • 灵活的配置管理

🔍 技术难点与解决方案

1. 反爬虫策略应对

问题: 目标网站存在反爬虫机制
解决方案:

  • 动态User-Agent轮换
  • 请求频率控制
  • 代理IP池使用
  • 模拟真实用户行为

2. 大量数据处理

问题: 数据量大,查询性能差
解决方案:

  • 数据库索引优化
  • 分页查询实现
  • 数据缓存策略
  • 异步数据处理

3. 数据一致性保证

问题: 多源数据格式不统一
解决方案:

  • 统一数据模型设计
  • 数据清洗和标准化
  • 数据验证机制
  • 异常数据处理

🎯 未来发展规划

短期目标 (3-6个月)

  • 优化爬虫系统稳定性
  • 增加更多数据源支持
  • 完善数据可视化功能
  • 提升系统性能

中期目标 (6-12个月)

  • 集成机器学习算法
  • 实现价格预测功能
  • 开发移动端应用
  • 增加API接口服务

长期目标 (1-2年)

  • 构建行业标准数据库
  • 开发智能决策支持系统
  • 建立行业生态平台
  • 拓展到其他地区

📈 项目成果与影响

技术成果

  • 完整的造价数据采集系统
  • 高效的数据分析平台
  • 直观的可视化展示界面
  • 可扩展的系统架构

实际应用价值

  • 为工程造价提供数据支撑
  • 提高价格查询效率
  • 支持成本控制决策
  • 促进行业信息透明化

社会效益

  • 降低工程造价成本
  • 提高行业信息化水平
  • 促进市场公平竞争
  • 支持政府监管决策

🎓 总结与感悟

技术收获

通过本项目的开发,深入学习了Flask Web开发、数据库设计、数据爬取、数据可视化等多项技术。在实践中遇到了反爬虫、大数据处理、系统性能优化等技术挑战,通过查阅资料、学习新技术、不断调试优化,最终成功解决了这些问题。

项目管理经验

  • 需求分析的重要性:明确的功能需求是项目成功的基础
  • 架构设计的必要性:良好的系统架构能够支持项目的长期发展
  • 测试验证的关键性:充分的测试能够避免生产环境的问题
  • 文档记录的价值:完善的文档有助于项目的维护和传承

行业认知提升

通过深入接触工程造价行业,了解了材料价格信息对工程建设的重要性,认识到信息化技术在传统行业转型升级中的重要作用。同时也看到了数据驱动的决策支持系统在提高工作效率、降低运营成本方面的巨大潜力。

🔗 联系方式

码界筑梦坊 - 专注技术分享与项目实践 各平台同名 获取项目源码


本文详细介绍了造价信息可视化分析系统的技术实现,希望能为相关领域的开发者提供参考和启发。如有疑问或建议,欢迎通过上述平台与我交流讨论。

标签: #Flask #Python #数据爬取 #数据可视化 #造价系统 #Web开发 #数据分析 #MySQL #SQLAlchemy #Bootstrap


网站公告

今日签到

点亮在社区的每一天
去签到