里昂惕夫矩阵:投入产出分析

发布于:2025-04-05 ⋅ 阅读:(13) ⋅ 点赞:(0)

基于python的里昂惕夫矩阵:投入产出分析

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Tuple, Dict

# ================== 中文字体配置 ==================
plt.rcParams['font.sans-serif'] = ['SimHei']  # Windows系统可用字体
# plt.rcParams['font.sans-serif'] = ['Arial Unicode MS']  # MacOS系统可用字体
plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题


# =================================================

class InputOutputAnalyzer:
    def __init__(self, Q: np.ndarray, FC: np.ndarray, Total: np.ndarray):
        """
        初始化投入产出分析器
        参数:
            Q : 中间产品矩阵(n x n)
            FC : 最终消费矩阵(n x m)
            Total : 总产出向量(n x 1)
        """
        self._validate_input(Q, FC, Total)
        self.n = Q.shape[0]  # 部门数量
        self.m = FC.shape[1]  # 需求类型数量
        self.Q = Q.astype(float)
        self.FC = FC.astype(float)
        self.Total = Total.astype(float)

        # 初始化核心矩阵
        self.A = None  # 直接消耗系数
        self.Leontief = None  # 里昂惕夫矩阵
        self.Leontief_inv = None  # 里昂惕夫逆矩阵
        self.B = None  # 完全消耗系数

        # 执行核心计算
        self._compute_core_matrices()

    def _validate_input(self, Q, FC, Total):
        """输入数据校验"""
        if Q.ndim != 2 or FC.ndim != 2:
            raise ValueError("输入矩阵必须是二维数组")
        if Q.shape[0] != Q.shape[1]:
            raise ValueError("中间产品矩阵Q必须是方阵")
        if Q.shape[0] != FC.shape[0]:
            raise ValueError("Q和FC的行数必须一致")
        if Q.shape[0] != Total.shape[0]:
            raise ValueError("总产出向量长度与矩阵维度不匹配")

        # 验证总产出平衡
        row_sum = Q.sum(axis=1) + FC.sum(axis=1)
        if not np.allclose(row_sum, Total, rtol=1e-3):
            err_info = "\n行和与总产出差异:\n" + str(row_sum - Total)
            raise ValueError("行和不等于总产出,请检查数据平衡性" + err_info)

    def _compute_core_matrices(self):
        """核心矩阵计算"""
        # 直接消耗系数矩阵 A = Q / Total(按列广播)
        self.A = self.Q / self.Total.reshape(1, -1)

        # 里昂惕夫矩阵 Leontief = I - A
        self.Leontief = np.eye(self.n) - self.A

        # 计算逆矩阵(增加异常处理)
        try:
            self.Leontief_inv = np.linalg.inv(self.Leontief)
        except np.linalg.LinAlgError as e:
            raise ValueError("里昂惕夫矩阵不可逆,请检查数据合理性") from e

        # 完全消耗系数 B = Leontief_inv - I
        self.B = self.Leontief_inv - np.eye(self.n)

    def sensitivity_analysis(self) -> Tuple[np.ndarray, np.ndarray]:
        """计算感应系数和影响系数"""
        total_avg = np.mean(self.Leontief_inv)
        inductance = np.mean(self.Leontief_inv, axis=1) / total_avg  # 感应系数
        impact = np.mean(self.Leontief_inv, axis=0) / total_avg  # 影响系数
        return inductance, impact

    def production_induction(self) -> Dict[str, np.ndarray]:
        """生产诱发计算"""
        # 生产诱发额 = Leontief逆矩阵 × 最终消费矩阵
        induce_value = self.Leontief_inv @ self.FC

        # 生产诱发指数 = 诱发额 / 各需求类型总量
        fc_total = self.FC.sum(axis=0).reshape(1, -1)
        induce_index = induce_value / fc_total

        # 最终依赖度 = 各需求诱发额 / 总诱发额
        total_induce = induce_value.sum(axis=1).reshape(-1, 1)
        dependency = induce_value / total_induce

        return {
            'induction_value': induce_value,
            'induction_index': induce_index,
            'dependency_degree': dependency
        }

    def plot_sensitivity(self, save_path: str = None):
        """绘制感应/影响系数热力图"""
        inductance, impact = self.sensitivity_analysis()

        plt.figure(figsize=(12, 6), dpi=100)

        # 感应系数子图
        plt.subplot(1, 2, 1)
        sns.heatmap(inductance.reshape(-1, 1),
                    annot=True, fmt=".2f",
                    cmap='coolwarm', center=1,
                    xticklabels=['感应系数'],
                    yticklabels=[f'部门{i + 1}' for i in range(self.n)])
        plt.title("各部门感应系数分布")

        # 影响系数子图
        plt.subplot(1, 2, 2)
        sns.heatmap(impact.reshape(-1, 1),
                    annot=True, fmt=".2f",
                    cmap='coolwarm', center=1,
                    xticklabels=['影响系数'],
                    yticklabels=[f'部门{i + 1}' for i in range(self.n)])
        plt.title("各部门影响系数分布")

        # 保存或显示
        if save_path:
            plt.savefig(save_path, bbox_inches='tight', pad_inches=0.1)
        plt.tight_layout()
        plt.show()

    def simulate_policy(self, new_FC: np.ndarray) -> Dict[str, np.ndarray]:
        """政策模拟:输入新的最终需求,返回影响结果"""
        if new_FC.shape != self.FC.shape:
            raise ValueError(f"新需求矩阵维度不匹配,应为{self.FC.shape},实际为{new_FC.shape}")

        # 计算新生产诱发额及其变化
        new_induce = self.Leontief_inv @ new_FC
        delta_induce = new_induce - self.production_induction()['induction_value']

        return {
            'new_induction': new_induce,
            'delta_induction': delta_induce,
            'growth_rate': delta_induce / self.Total.reshape(-1, 1)
        }


# ------------------------------
# 示例使用(含中文显示测试)
if __name__ == "__main__":
    # 测试数据(基于平衡关系构造)
    Q = np.array([[96, 224, 179, 160],
                  [16, 672, 77, 160],
                  [320, 336, 1024, 320],
                  [48, 336, 256, 160]])
    FC = np.array([[894, 47],
                   [1118, 197],
                   [220, 340],
                   [480, 320]])
    Total = np.array([1600, 2240, 2560, 1600])  # 验证:Q行和 + FC行和 = Total

    # 初始化分析器
    try:
        ioa = InputOutputAnalyzer(Q, FC, Total)
    except ValueError as e:
        print("数据校验失败:", str(e))
        exit()

    # 打印关键矩阵(保留两位小数)
    print("直接消耗系数矩阵 A:\n", np.round(ioa.A, 2))
    print("\n里昂惕夫逆矩阵:\n", np.round(ioa.Leontief_inv, 2))

    # 绘制中文热力图
    ioa.plot_sensitivity(save_path="sensitivity_heatmap.png")  # 保存为图片

    # 模拟消费增加20%的政策
    new_FC = FC.copy()
    new_FC[:, 0] *= 1.2  # 第一列(消费)增加20%
    sim_result = ioa.simulate_policy(new_FC)

    # 打印政策影响结果
    print("\n消费增长20%带来的产出变化:")
    print(np.round(sim_result['delta_induction'], 1))

    # 显示最终依赖度(中文标签测试)
    induction = ioa.production_induction()
    print("\n最终需求依赖度矩阵:")
    print(np.round(induction['dependency_degree'], 2))