目录
前置:
本博文是一个系列。在本人“数据库专栏”-》“PostgreSQL_”开头的博文。
1 数据下载
本实例的数据都下载自优矿,具体如何下载可自行上优矿官网,这里不赘述。
下载后的csv文件如下:
1.1 多个股票多个交易日
这些是沪深两市 2023-07-11 到 2025- 02-12 所有股票日数据
1.2 一个交易日所有股票
2 数据保存,使用python中的psycopg2包
2.1 在PyCharm中创建新项目,并安装包
pip install psycopg2、pip install pandas
2.2 代码-多个股票多个交易日
import psycopg2
import os
import pandas as pd
from datetime import datetime
def connect_db():
try:
conn = psycopg2.connect(database='db_stock',user='postgres',password='写你自己的密码',host='127.0.0.1',port=5432)
except Exception as e:
print(f'connection failed。{e}')
else:
return conn
pass
# 批量插入数据
def many_day_insert(data_dir:str):
'''
1 纵向数据
1.1 查询 ticker,把新股筛出来
1.2 新股,用 insert; 非新股,用 update,array_cat
2 横向数据
2.1 按日期分组,直接 insert
:param date_dir:
:return:
'''
file_list = os.listdir(data_dir)
col_list = ['secID','tradeDate','openPrice','highestPrice','lowestPrice','closePrice','turnoverVol','turnoverValue','dealAmount','turnoverRate','negMarketValue','marketValue','chgPct','PE','PE1','PB','isOpen','vwap']
df = None
for file_one in file_list:
file_path = data_dir + file_one
df00 = pd.read_csv(file_path,encoding='utf-8')
df00 = df00.loc[:,col_list].copy()
if df is None:
df = df00
else:
df = pd.concat([df,df00])
df['ticker'] = df['secID'].str.slice(0,6)
ticker_list = df['ticker'].to_list()
ticker_list = list(set(ticker_list))
ticker_list_str = '\',\''.join(ticker_list)
ticker_list_str = '\''+ticker_list_str+'\''
# sql 判断 t_stock_daily 是否为空表
sql_vertical_null_str = f"select count(*) from t_stock_daily;"
# sql 查询 ticker
sql_vertical_q_str = f"select ticker from t_stock_daily where ticker not in ({ticker_list_str});"
sql_vertical_i_str = '''
insert into t_stock_daily(ticker,tradeDate,openPrice,highestPrice,lowestPrice,closePrice,turnoverVol,turnoverValue,dealAmount,turnoverRate,negMarketValue,marketValue,chgPct,PE,PE1,PB,isOpen,vwap) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);
'''
sql_vertical_u_str = '''
update t_stock_daily set tradeDate=array_cat(tradeDate,%s),openPrice=array_cat(openPrice,%s),highestPrice=array_cat(highestPrice,%s),lowestPrice=array_cat(lowestPrice,%s),closePrice=array_cat(closePrice,%s),turnoverVol=array_cat(turnoverVol,%s),turnoverValue=array_cat(turnoverValue,%s),dealAmount=array_cat(dealAmount,%s),turnoverRate=array_cat(turnoverRate,%s),negMarketValue=array_cat(negMarketValue,%s),marketValue=array_cat(marketValue,%s),chgPct=array_cat(chgPct,%s),PE=array_cat(PE,%s),PE1=array_cat(PE1,%s),PB=array_cat(PB,%s),isOpen=array_cat(isOpen,%s),vwap=array_cat(vwap,%s) where ticker=%s;
'''
sql_horizonal_i_str = '''
insert into t_daily(tradeDate,tradeDateOj,ticker,openPrice,highestPrice,lowestPrice,closePrice,turnoverVol,turnoverValue,dealAmount,turnoverRate,negMarketValue,marketValue,chgPct,PE,PE1,PB,isOpen,vwap) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);
'''
conn = connect_db()
cur = conn.cursor()
cur.execute(sql_vertical_null_str)
res0 = cur.fetchone()
if res0[0] == 0:
new_ticker_list = ticker_list
else:
cur.execute(sql_vertical_q_str)
res = cur.fetchall()
new_ticker_list = []
if res is not None:
for one_node in res:
new_ticker_list.append(one_node[0])
v_i_list = []
v_u_list = []
h_i_list = []
df_group_v = df.groupby(by='ticker')
for ticker,group in df_group_v:
if ticker in new_ticker_list:
one_node = (
ticker,
group['tradeDate'].to_list(),
group['openPrice'].to_list(),
group['highestPrice'].to_list(),
group['lowestPrice'].to_list(),
group['closePrice'].to_list(),
group['turnoverVol'].to_list(),
group['turnoverValue'].to_list(),
group['dealAmount'].to_list(),
group['turnoverRate'].to_list(),
group['negMarketValue'].to_list(),
group['marketValue'].to_list(),
group['chgPct'].to_list(),
group['PE'].to_list(),
group['PE1'].to_list(),
group['PB'].to_list(),
group['isOpen'].to_list(),
group['vwap'].to_list()
)
v_i_list.append(one_node)
pass
else:
one_node = (
group['tradeDate'].to_list(),
group['openPrice'].to_list(),
group['highestPrice'].to_list(),
group['lowestPrice'].to_list(),
group['closePrice'].to_list(),
group['turnoverVol'].to_list(),
group['turnoverValue'].to_list(),
group['dealAmount'].to_list(),
group['turnoverRate'].to_list(),
group['negMarketValue'].to_list(),
group['marketValue'].to_list(),
group['chgPct'].to_list(),
group['PE'].to_list(),
group['PE1'].to_list(),
group['PB'].to_list(),
group['isOpen'].to_list(),
group['vwap'].to_list(),
ticker
)
v_u_list.append(one_node)
pass
pass
df_group_h = df.groupby(by='tradeDate')
for date_str,group in df_group_h:
one_node = (
date_str,
datetime.strptime(date_str,'%Y-%m-%d'),
group['ticker'].to_list(),
group['openPrice'].to_list(),
group['highestPrice'].to_list(),
group['lowestPrice'].to_list(),
group['closePrice'].to_list(),
group['turnoverVol'].to_list(),
group['turnoverValue'].to_list(),
group['dealAmount'].to_list(),
group['turnoverRate'].to_list(),
group['negMarketValue'].to_list(),
group['marketValue'].to_list(),
group['chgPct'].to_list(),
group['PE'].to_list(),
group['PE1'].to_list(),
group['PB'].to_list(),
group['isOpen'].to_list(),
group['vwap'].to_list()
)
h_i_list.append(one_node)
pass
try:
cur.executemany(sql_vertical_i_str,v_i_list)
cur.executemany(sql_vertical_u_str,v_u_list)
cur.executemany(sql_horizonal_i_str,h_i_list)
conn.commit()
pass
except Exception as e:
print(f'error: {e}')
conn.rollback()
finally:
cur.close()
conn.close()
pass
if __name__ == '__main__':
many_day_insert(r'E:/temp005/')
pass
2.3 代码-一个交易日所有股票
def one_day_insert(file_path:str):
'''
1 纵向数据
1.1 新股,insert
1.2 非新股,update, array_append
2 横向数据
2.1 直接 insert 一条数据
:param file_path:
:return:
'''
df = pd.read_csv(file_path,encoding='utf-8')
col_list = ['secID', 'tradeDate', 'openPrice', 'highestPrice', 'lowestPrice', 'closePrice', 'turnoverVol',
'turnoverValue', 'dealAmount', 'turnoverRate', 'negMarketValue', 'marketValue', 'chgPct', 'PE', 'PE1',
'PB', 'isOpen', 'vwap']
df = df.loc[:,col_list].copy()
df['ticker'] = df['secID'].str.slice(0,6)
ticker_list = df['ticker'].to_list()
ticker_list = list(set(ticker_list))
ticker_list_str = '\',\''.join(ticker_list)
ticker_list_str = '\'' + ticker_list_str + '\''
# sql 判断 t_stock_daily 是否为空表
sql_vertical_null_str = f"select count(*) from t_stock_daily;"
# sql 查询 ticker
sql_vertical_q_str = f"select ticker from t_stock_daily where ticker not in ({ticker_list_str});"
sql_vertical_i_str = '''
insert into t_stock_daily(ticker,tradeDate,openPrice,highestPrice,lowestPrice,closePrice,turnoverVol,turnoverValue,dealAmount,turnoverRate,negMarketValue,marketValue,chgPct,PE,PE1,PB,isOpen,vwap) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);
'''
sql_vertical_u_str = '''
update t_stock_daily set tradeDate=array_append(tradeDate,%s),openPrice=array_append(openPrice,%s),highestPrice=array_append(highestPrice,%s),lowestPrice=array_append(lowestPrice,%s),closePrice=array_append(closePrice,%s),turnoverVol=array_append(turnoverVol,%s),turnoverValue=array_append(turnoverValue,%s),dealAmount=array_append(dealAmount,%s),turnoverRate=array_append(turnoverRate,%s),negMarketValue=array_append(negMarketValue,%s),marketValue=array_append(marketValue,%s),chgPct=array_append(chgPct,%s),PE=array_append(PE,%s),PE1=array_append(PE1,%s),PB=array_append(PB,%s),isOpen=array_append(isOpen,%s),vwap=array_append(vwap,%s) where ticker=%s;
'''
sql_horizonal_i_str = '''
insert into t_daily(tradeDate,tradeDateOj,ticker,openPrice,highestPrice,lowestPrice,closePrice,turnoverVol,turnoverValue,dealAmount,turnoverRate,negMarketValue,marketValue,chgPct,PE,PE1,PB,isOpen,vwap) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);
'''
conn = connect_db()
cur = conn.cursor()
cur.execute(sql_vertical_null_str)
res0 = cur.fetchone()
if res0[0] == 0:
new_ticker_list = ticker_list
else:
cur.execute(sql_vertical_q_str)
res = cur.fetchall()
new_ticker_list = []
if res is not None:
for one_node in res:
new_ticker_list.append(one_node[0])
v_i_list = []
v_u_list = []
h_one = None
for i,row in df.iterrows():
if row['ticker'] in new_ticker_list:
one_node = (
row['ticker'],
[row['tradeDate']],
[row['openPrice']],
[row['highestPrice']],
[row['lowestPrice']],
[row['closePrice']],
[row['turnoverVol']],
[row['turnoverValue']],
[row['dealAmount']],
[row['turnoverRate']],
[row['negMarketValue']],
[row['marketValue']],
[row['chgPct']],
[row['PE']],
[row['PE1']],
[row['PB']],
[row['isOpen']],
[row['vwap']]
)
v_i_list.append(one_node)
pass
else:
one_node = (
row['tradeDate'],
row['openPrice'],
row['highestPrice'],
row['lowestPrice'],
row['closePrice'],
row['turnoverVol'],
row['turnoverValue'],
row['dealAmount'],
row['turnoverRate'],
row['negMarketValue'],
row['marketValue'],
row['chgPct'],
row['PE'],
row['PE1'],
row['PB'],
row['isOpen'],
row['vwap'],
row['ticker']
)
v_u_list.append(one_node)
pass
pass
date_str = df['tradeDate'][0]
h_one = (
date_str,
datetime.strptime(date_str,'%Y-%m-%d'),
df['ticker'].to_list(),
df['openPrice'].to_list(),
df['highestPrice'].to_list(),
df['lowestPrice'].to_list(),
df['closePrice'].to_list(),
df['turnoverVol'].to_list(),
df['turnoverValue'].to_list(),
df['dealAmount'].to_list(),
df['turnoverRate'].to_list(),
df['negMarketValue'].to_list(),
df['marketValue'].to_list(),
df['chgPct'].to_list(),
df['PE'].to_list(),
df['PE1'].to_list(),
df['PB'].to_list(),
df['isOpen'].to_list(),
df['vwap'].to_list()
)
try:
cur.executemany(sql_vertical_i_str,v_i_list)
cur.executemany(sql_vertical_u_str,v_u_list)
cur.execute(sql_horizonal_i_str,h_one)
conn.commit()
pass
except Exception as e:
print(f'error: {e}')
conn.rollback()
finally:
cur.close()
conn.close()
pass
2.4 在 pgAdmin4 中查看数据存储是否正确
上面的操作 存储的是 2023年7月11日 到 2025年2月13日 数据
1)t_daily
查询 t_daily 中最小日期和最大日期
select max(tradeDateOj) from t_daily;
select min(tradeDateOj) from t_daily;
2) t_stock_daily
查看某一股票的第一条数据和最后一条数据
select tradeDate[1],openPrice[1],highestPrice[1],lowestPrice[1],closePrice[1],turnoverVol[1],turnoverValue[1],dealAmount[1],turnoverRate[1],negMarketValue[1],marketValue[1],chgPct[1],PE[1],PE1[1],PB[1],isOpen[1],vwap[1] from t_stock_daily where ticker='000001';
select tradeDate[array_length(tradeDate,1)],openPrice[array_length(openPrice,1)],highestPrice[array_length(highestPrice,1)],lowestPrice[array_length(lowestPrice,1)],closePrice[array_length(closePrice,1)],turnoverVol[array_length(turnoverVol,1)],turnoverValue[array_length(turnoverValue,1)],dealAmount[array_length(dealAmount,1)],turnoverRate[array_length(turnoverRate,1)],negMarketValue[array_length(negMarketValue,1)],marketValue[array_length(marketValue,1)],chgPct[array_length(chgPct,1)],PE[array_length(PE,1)],PE1[array_length(PE1,1)],PB[array_length(PB,1)],isOpen[array_length(isOpen,1)],vwap[array_length(vwap,1)] from t_stock_daily where ticker='000001';