基于python的里昂惕夫矩阵:投入产出分析
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Tuple, Dict
# ================== 中文字体配置 ==================
plt.rcParams['font.sans-serif'] = ['SimHei'] # Windows系统可用字体
# plt.rcParams['font.sans-serif'] = ['Arial Unicode MS'] # MacOS系统可用字体
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
# =================================================
class InputOutputAnalyzer:
def __init__(self, Q: np.ndarray, FC: np.ndarray, Total: np.ndarray):
"""
初始化投入产出分析器
参数:
Q : 中间产品矩阵(n x n)
FC : 最终消费矩阵(n x m)
Total : 总产出向量(n x 1)
"""
self._validate_input(Q, FC, Total)
self.n = Q.shape[0] # 部门数量
self.m = FC.shape[1] # 需求类型数量
self.Q = Q.astype(float)
self.FC = FC.astype(float)
self.Total = Total.astype(float)
# 初始化核心矩阵
self.A = None # 直接消耗系数
self.Leontief = None # 里昂惕夫矩阵
self.Leontief_inv = None # 里昂惕夫逆矩阵
self.B = None # 完全消耗系数
# 执行核心计算
self._compute_core_matrices()
def _validate_input(self, Q, FC, Total):
"""输入数据校验"""
if Q.ndim != 2 or FC.ndim != 2:
raise ValueError("输入矩阵必须是二维数组")
if Q.shape[0] != Q.shape[1]:
raise ValueError("中间产品矩阵Q必须是方阵")
if Q.shape[0] != FC.shape[0]:
raise ValueError("Q和FC的行数必须一致")
if Q.shape[0] != Total.shape[0]:
raise ValueError("总产出向量长度与矩阵维度不匹配")
# 验证总产出平衡
row_sum = Q.sum(axis=1) + FC.sum(axis=1)
if not np.allclose(row_sum, Total, rtol=1e-3):
err_info = "\n行和与总产出差异:\n" + str(row_sum - Total)
raise ValueError("行和不等于总产出,请检查数据平衡性" + err_info)
def _compute_core_matrices(self):
"""核心矩阵计算"""
# 直接消耗系数矩阵 A = Q / Total(按列广播)
self.A = self.Q / self.Total.reshape(1, -1)
# 里昂惕夫矩阵 Leontief = I - A
self.Leontief = np.eye(self.n) - self.A
# 计算逆矩阵(增加异常处理)
try:
self.Leontief_inv = np.linalg.inv(self.Leontief)
except np.linalg.LinAlgError as e:
raise ValueError("里昂惕夫矩阵不可逆,请检查数据合理性") from e
# 完全消耗系数 B = Leontief_inv - I
self.B = self.Leontief_inv - np.eye(self.n)
def sensitivity_analysis(self) -> Tuple[np.ndarray, np.ndarray]:
"""计算感应系数和影响系数"""
total_avg = np.mean(self.Leontief_inv)
inductance = np.mean(self.Leontief_inv, axis=1) / total_avg # 感应系数
impact = np.mean(self.Leontief_inv, axis=0) / total_avg # 影响系数
return inductance, impact
def production_induction(self) -> Dict[str, np.ndarray]:
"""生产诱发计算"""
# 生产诱发额 = Leontief逆矩阵 × 最终消费矩阵
induce_value = self.Leontief_inv @ self.FC
# 生产诱发指数 = 诱发额 / 各需求类型总量
fc_total = self.FC.sum(axis=0).reshape(1, -1)
induce_index = induce_value / fc_total
# 最终依赖度 = 各需求诱发额 / 总诱发额
total_induce = induce_value.sum(axis=1).reshape(-1, 1)
dependency = induce_value / total_induce
return {
'induction_value': induce_value,
'induction_index': induce_index,
'dependency_degree': dependency
}
def plot_sensitivity(self, save_path: str = None):
"""绘制感应/影响系数热力图"""
inductance, impact = self.sensitivity_analysis()
plt.figure(figsize=(12, 6), dpi=100)
# 感应系数子图
plt.subplot(1, 2, 1)
sns.heatmap(inductance.reshape(-1, 1),
annot=True, fmt=".2f",
cmap='coolwarm', center=1,
xticklabels=['感应系数'],
yticklabels=[f'部门{i + 1}' for i in range(self.n)])
plt.title("各部门感应系数分布")
# 影响系数子图
plt.subplot(1, 2, 2)
sns.heatmap(impact.reshape(-1, 1),
annot=True, fmt=".2f",
cmap='coolwarm', center=1,
xticklabels=['影响系数'],
yticklabels=[f'部门{i + 1}' for i in range(self.n)])
plt.title("各部门影响系数分布")
# 保存或显示
if save_path:
plt.savefig(save_path, bbox_inches='tight', pad_inches=0.1)
plt.tight_layout()
plt.show()
def simulate_policy(self, new_FC: np.ndarray) -> Dict[str, np.ndarray]:
"""政策模拟:输入新的最终需求,返回影响结果"""
if new_FC.shape != self.FC.shape:
raise ValueError(f"新需求矩阵维度不匹配,应为{self.FC.shape},实际为{new_FC.shape}")
# 计算新生产诱发额及其变化
new_induce = self.Leontief_inv @ new_FC
delta_induce = new_induce - self.production_induction()['induction_value']
return {
'new_induction': new_induce,
'delta_induction': delta_induce,
'growth_rate': delta_induce / self.Total.reshape(-1, 1)
}
# ------------------------------
# 示例使用(含中文显示测试)
if __name__ == "__main__":
# 测试数据(基于平衡关系构造)
Q = np.array([[96, 224, 179, 160],
[16, 672, 77, 160],
[320, 336, 1024, 320],
[48, 336, 256, 160]])
FC = np.array([[894, 47],
[1118, 197],
[220, 340],
[480, 320]])
Total = np.array([1600, 2240, 2560, 1600]) # 验证:Q行和 + FC行和 = Total
# 初始化分析器
try:
ioa = InputOutputAnalyzer(Q, FC, Total)
except ValueError as e:
print("数据校验失败:", str(e))
exit()
# 打印关键矩阵(保留两位小数)
print("直接消耗系数矩阵 A:\n", np.round(ioa.A, 2))
print("\n里昂惕夫逆矩阵:\n", np.round(ioa.Leontief_inv, 2))
# 绘制中文热力图
ioa.plot_sensitivity(save_path="sensitivity_heatmap.png") # 保存为图片
# 模拟消费增加20%的政策
new_FC = FC.copy()
new_FC[:, 0] *= 1.2 # 第一列(消费)增加20%
sim_result = ioa.simulate_policy(new_FC)
# 打印政策影响结果
print("\n消费增长20%带来的产出变化:")
print(np.round(sim_result['delta_induction'], 1))
# 显示最终依赖度(中文标签测试)
induction = ioa.production_induction()
print("\n最终需求依赖度矩阵:")
print(np.round(induction['dependency_degree'], 2))