【无标题】

发布于:2025-06-30 ⋅ 阅读:(15) ⋅ 点赞:(0)

Day58

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.seasonal import seasonal_decompose
from sklearn.metrics import mean_squared_error, mean_absolute_error
import warnings
warnings.filterwarnings('ignore')

# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']  # 指定默认字体
plt.rcParams['axes.unicode_minus'] = False  # 解决保存图像时负号'-'显示为方块的问题

# 1. 加载太阳黑子数据集
def load_sunspots_data():
    """加载太阳黑子数据集并返回时间序列"""
    # 使用statsmodels内置的太阳黑子数据集
    data = sm.datasets.sunspots.load_pandas()
    df = data.data
    # 设置年份为索引
    df['YEAR'] = pd.to_datetime(df['YEAR'], format='%Y')
    df.set_index('YEAR', inplace=True)
    return df

# 2. 数据可视化
def visualize_data(series):
    """可视化原始时间序列、分解图、ACF和PACF"""
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    
    # 原始时间序列
    axes[0, 0].plot(series)
    axes[0, 0].set_title('原始太阳黑子序列')
    
    # 分解图
    decomposition = seasonal_decompose(series, model='additive', period=11)  # 太阳黑子周期约为11年
    decomposition.plot()
    plt.tight_layout()
    
    # ACF图
    plot_acf(series, lags=40, ax=axes[0, 1])
    axes[0, 1].set_title('原始序列的ACF')
    
    # PACF图
    plot_pacf(series, lags=40, ax=axes[1, 0])
    axes[1, 0].set_title('原始序列的PACF')
    
    plt.tight_layout()
    plt.show()
    
    # 打印基本统计信息
    print(f"数据基本统计信息:")
    print(series.describe())

# 3. 平稳性检验
def test_stationarity(series, title='原始序列'):
    """进行ADF检验并输出结果"""
    print(f"\n{title}的ADF平稳性检验:")
    result = adfuller(series)
    print(f'ADF统计量: {result[0]:.4f}')
    print(f'p值: {result[1]:.4f}')
    print('临界值:')
    for key, value in result[4].items():
        print(f'   {key}: {value:.4f}')
    
    if result[1] <= 0.05:
        print("序列平稳(拒绝原假设,p值 <= 0.05)")
        return True
    else:
        print("序列非平稳(不能拒绝原假设,p值 > 0.05)")
        return False

# 4. 确定差分阶数d
def determine_differencing(series):
    """确定差分阶数d"""
    # 检验原始序列
    is_stationary = test_stationarity(series, '原始序列')
    d = 0
    
    if not is_stationary:
        # 一阶差分
        diff1 = series.diff().dropna()
        is_stationary = test_stationarity(diff1, '一阶差分序列')
        d = 1
        
        if not is_stationary:
            # 二阶差分
            diff2 = diff1.diff().dropna()
            is_stationary = test_stationarity(diff2, '二阶差分序列')
            d = 2
    
    print(f"\n确定差分阶数 d = {d}")
    
    # 可视化差分后的序列及其ACF、PACF
    if d > 0:
        fig, axes = plt.subplots(1, 2, figsize=(14, 5))
        
        if d == 1:
            axes[0].plot(diff1)
            axes[0].set_title('一阶差分序列')
            plot_acf(diff1, lags=40, ax=axes[1])
            axes[1].set_title('一阶差分序列的ACF')
        else:
            axes[0].plot(diff2)
            axes[0].set_title('二阶差分序列')
            plot_acf(diff2, lags=40, ax=axes[1])
            axes[1].set_title('二阶差分序列的ACF')
        
        plt.tight_layout()
        plt.show()
    
    return d

# 5. 确定ARIMA的p和q参数
def determine_p_q(series, d):
    """基于差分后的序列确定p和q参数"""
    # 对序列进行差分
    if d == 0:
        diff_series = series
    elif d == 1:
        diff_series = series.diff().dropna()
    else:
        diff_series = series.diff().diff().dropna()
    
    # 绘制ACF和PACF图
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    plot_acf(diff_series, lags=40, ax=axes[0])
    axes[0].set_title('差分序列的ACF')
    plot_pacf(diff_series, lags=40, ax=axes[1])
    axes[1].set_title('差分序列的PACF')
    plt.tight_layout()
    plt.show()
    
    # 根据ACF和PACF图手动确定p和q
    print("\n请根据ACF和PACF图确定p和q的值:")
    print("  - PACF图中显著截断的滞后阶数为p")
    print("  - ACF图中显著截断的滞后阶数为q")
    
    # 这里使用自动化方法(AIC最小化)作为备选
    best_aic = float('inf')
    best_p = 0
    best_q = 0
    
    # 设置p和q的最大搜索范围
    max_p = 5
    max_q = 5
    
    print("\n正在搜索最优(p,q)参数组合...")
    for p in range(0, max_p + 1):
        for q in range(0, max_q + 1):
            try:
                model = ARIMA(series, order=(p, d, q))
                results = model.fit()
                if results.aic < best_aic:
                    best_aic = results.aic
                    best_p = p
                    best_q = q
                print(f'ARIMA({p},{d},{q}) - AIC: {results.aic:.2f}')
            except:
                continue
    
    print(f"\nAIC最优参数: p = {best_p}, q = {best_q} (AIC = {best_aic:.2f})")
    
    # 允许用户手动选择或使用AIC推荐值
    use_auto = input("\n是否使用AIC推荐的参数?(y/n): ").lower().strip()
    if use_auto == 'y':
        p, q = best_p, best_q
    else:
        p = int(input("请输入p值: "))
        q = int(input("请输入q值: "))
    
    print(f"\n确定最终参数: p = {p}, q = {q}")
    return p, q

# 6. 构建并训练ARIMA模型
def build_arima_model(series, p, d, q):
    """构建并训练ARIMA模型"""
    print(f"\n构建ARIMA模型: ARIMA({p}, {d}, {q})")
    model = ARIMA(series, order=(p, d, q))
    model_fit = model.fit()
    
    print("\n模型摘要:")
    print(model_fit.summary())
    
    # 绘制残差图
    residuals = pd.DataFrame(model_fit.resid)
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # 残差时序图
    axes[0].plot(residuals)
    axes[0].set_title('模型残差')
    
    # 残差直方图
    residuals.hist(ax=axes[1])
    axes[1].set_title('残差直方图')
    
    plt.tight_layout()
    plt.show()
    
    # 残差的ACF和PACF图
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    plot_acf(residuals, lags=40, ax=axes[0])
    axes[0].set_title('残差的ACF')
    plot_pacf(residuals, lags=40, ax=axes[1])
    axes[1].set_title('残差的PACF')
    plt.tight_layout()
    plt.show()
    
    # Ljung-Box检验(残差白噪声检验)
    lb_test = sm.stats.acorr_ljungbox(residuals, lags=[10], return_df=True)
    print("\nLjung-Box检验(残差白噪声检验):")
    print(lb_test)
    
    # 正态性检验
    print("\n残差正态性检验:")
    print(f'偏度: {residuals.skew()[0]:.4f}')
    print(f'峰度: {residuals.kurt()[0]:.4f}')
    
    return model_fit

# 7. 模型预测
def make_predictions(model_fit, series, steps=30):
    """进行模型预测并可视化结果"""
    # 获取预测结果
    forecast = model_fit.get_forecast(steps=steps)
    forecast_mean = forecast.predicted_mean
    forecast_ci = forecast.conf_int(alpha=0.05)  # 95%置信区间
    
    # 生成预测日期
    last_date = series.index[-1]
    forecast_dates = pd.date_range(start=last_date + pd.DateOffset(years=1), periods=steps, freq='AS')
    
    # 可视化预测结果
    plt.figure(figsize=(14, 7))
    
    # 绘制历史数据
    plt.plot(series.index, series, label='历史数据')
    
    # 绘制预测数据
    plt.plot(forecast_dates, forecast_mean, 'r--', label='预测值')
    plt.fill_between(forecast_dates, forecast_ci.iloc[:, 0], forecast_ci.iloc[:, 1], color='r', alpha=0.1, label='95%置信区间')
    
    plt.title('太阳黑子数量预测')
    plt.xlabel('年份')
    plt.ylabel('太阳黑子数量')
    plt.legend()
    plt.grid(True)
    plt.show()
    
    # 打印预测结果
    print("\n未来30年太阳黑子数量预测:")
    forecast_df = pd.DataFrame({
        '年份': forecast_dates.year,
        '预测值': forecast_mean.values,
        '下限': forecast_ci.iloc[:, 0].values,
        '上限': forecast_ci.iloc[:, 1].values
    })
    print(forecast_df.round(2))
    
    return forecast_df

# 主函数
def main():
    print("===== 太阳黑子数据集ARIMA分析 =====")
    
    # 1. 加载数据
    df = load_sunspots_data()
    series = df['SUNACTIVITY']
    
    # 2. 数据可视化
    visualize_data(series)
    
    # 3. 确定差分阶数d
    d = determine_differencing(series)
    
    # 4. 确定p和q参数
    p, q = determine_p_q(series, d)
    
    # 5. 构建并训练ARIMA模型
    model_fit = build_arima_model(series, p, d, q)
    
    # 6. 模型预测
    forecast = make_predictions(model_fit, series, steps=30)
    
    print("\n===== ARIMA分析完成 =====")

if __name__ == "__main__":
    main()    

@浙大疏锦行