PostgreSQL_数据下载并保存(psycopg2)

发布于:2025-03-22 ⋅ 阅读:(16) ⋅ 点赞:(0)

目录

前置:

1 数据下载

1.1 多个股票多个交易日

1.2 一个交易日所有股票 

2 数据保存,使用python中的psycopg2包

2.1 在PyCharm中创建新项目,并安装包

2.2  代码-多个股票多个交易日

2.3 代码-一个交易日所有股票

2.4 在 pgAdmin4 中查看数据存储是否正确

1)t_daily

​编辑 2) t_stock_daily


前置:

本博文是一个系列。在本人“数据库专栏”-》“PostgreSQL_”开头的博文。

1 数据下载

本实例的数据都下载自优矿,具体如何下载可自行上优矿官网,这里不赘述。

下载后的csv文件如下:

1.1 多个股票多个交易日

这些是沪深两市 2023-07-11 到 2025- 02-12 所有股票日数据

1.2 一个交易日所有股票 

2 数据保存,使用python中的psycopg2包

2.1 在PyCharm中创建新项目,并安装包

pip install psycopg2、pip install pandas

2.2  代码-多个股票多个交易日

import psycopg2
import os
import pandas as pd
from datetime import datetime

def connect_db():
    try:
        conn = psycopg2.connect(database='db_stock',user='postgres',password='写你自己的密码',host='127.0.0.1',port=5432)
    except Exception as e:
        print(f'connection failed。{e}')
    else:
        return conn
    pass

# 批量插入数据
def many_day_insert(data_dir:str):
    '''
    1 纵向数据
    1.1 查询 ticker,把新股筛出来
    1.2 新股,用 insert; 非新股,用 update,array_cat
    2 横向数据
    2.1 按日期分组,直接 insert
    :param date_dir:
    :return:
    '''
    file_list = os.listdir(data_dir)
    col_list = ['secID','tradeDate','openPrice','highestPrice','lowestPrice','closePrice','turnoverVol','turnoverValue','dealAmount','turnoverRate','negMarketValue','marketValue','chgPct','PE','PE1','PB','isOpen','vwap']
    df = None
    for file_one in file_list:
        file_path = data_dir + file_one
        df00 = pd.read_csv(file_path,encoding='utf-8')
        df00 = df00.loc[:,col_list].copy()
        if df is None:
            df = df00
        else:
            df = pd.concat([df,df00])

    df['ticker'] = df['secID'].str.slice(0,6)
    ticker_list = df['ticker'].to_list()
    ticker_list = list(set(ticker_list))
    ticker_list_str = '\',\''.join(ticker_list)
    ticker_list_str = '\''+ticker_list_str+'\''

    # sql 判断 t_stock_daily 是否为空表
    sql_vertical_null_str = f"select count(*) from t_stock_daily;"
    # sql 查询 ticker
    sql_vertical_q_str = f"select ticker from t_stock_daily where ticker not in ({ticker_list_str});"

    sql_vertical_i_str = '''
    insert into t_stock_daily(ticker,tradeDate,openPrice,highestPrice,lowestPrice,closePrice,turnoverVol,turnoverValue,dealAmount,turnoverRate,negMarketValue,marketValue,chgPct,PE,PE1,PB,isOpen,vwap) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);
    '''

    sql_vertical_u_str = '''
    update t_stock_daily set tradeDate=array_cat(tradeDate,%s),openPrice=array_cat(openPrice,%s),highestPrice=array_cat(highestPrice,%s),lowestPrice=array_cat(lowestPrice,%s),closePrice=array_cat(closePrice,%s),turnoverVol=array_cat(turnoverVol,%s),turnoverValue=array_cat(turnoverValue,%s),dealAmount=array_cat(dealAmount,%s),turnoverRate=array_cat(turnoverRate,%s),negMarketValue=array_cat(negMarketValue,%s),marketValue=array_cat(marketValue,%s),chgPct=array_cat(chgPct,%s),PE=array_cat(PE,%s),PE1=array_cat(PE1,%s),PB=array_cat(PB,%s),isOpen=array_cat(isOpen,%s),vwap=array_cat(vwap,%s) where ticker=%s;
    '''

    sql_horizonal_i_str = '''
    insert into t_daily(tradeDate,tradeDateOj,ticker,openPrice,highestPrice,lowestPrice,closePrice,turnoverVol,turnoverValue,dealAmount,turnoverRate,negMarketValue,marketValue,chgPct,PE,PE1,PB,isOpen,vwap) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);
    '''

    conn = connect_db()
    cur = conn.cursor()
    cur.execute(sql_vertical_null_str)
    res0 = cur.fetchone()
    if res0[0] == 0:
        new_ticker_list = ticker_list
    else:
        cur.execute(sql_vertical_q_str)
        res = cur.fetchall()
        new_ticker_list = []
        if res is not None:
            for one_node in res:
                new_ticker_list.append(one_node[0])

    v_i_list = []
    v_u_list = []
    h_i_list = []

    df_group_v = df.groupby(by='ticker')
    for ticker,group in df_group_v:
        if ticker in new_ticker_list:
            one_node = (
                ticker,
                group['tradeDate'].to_list(),
                group['openPrice'].to_list(),
                group['highestPrice'].to_list(),
                group['lowestPrice'].to_list(),
                group['closePrice'].to_list(),
                group['turnoverVol'].to_list(),
                group['turnoverValue'].to_list(),
                group['dealAmount'].to_list(),
                group['turnoverRate'].to_list(),
                group['negMarketValue'].to_list(),
                group['marketValue'].to_list(),
                group['chgPct'].to_list(),
                group['PE'].to_list(),
                group['PE1'].to_list(),
                group['PB'].to_list(),
                group['isOpen'].to_list(),
                group['vwap'].to_list()
            )
            v_i_list.append(one_node)
            pass
        else:
            one_node = (
                group['tradeDate'].to_list(),
                group['openPrice'].to_list(),
                group['highestPrice'].to_list(),
                group['lowestPrice'].to_list(),
                group['closePrice'].to_list(),
                group['turnoverVol'].to_list(),
                group['turnoverValue'].to_list(),
                group['dealAmount'].to_list(),
                group['turnoverRate'].to_list(),
                group['negMarketValue'].to_list(),
                group['marketValue'].to_list(),
                group['chgPct'].to_list(),
                group['PE'].to_list(),
                group['PE1'].to_list(),
                group['PB'].to_list(),
                group['isOpen'].to_list(),
                group['vwap'].to_list(),
                ticker
            )
            v_u_list.append(one_node)
            pass
        pass
    df_group_h = df.groupby(by='tradeDate')
    for date_str,group in df_group_h:
        one_node = (
            date_str,
            datetime.strptime(date_str,'%Y-%m-%d'),
            group['ticker'].to_list(),
            group['openPrice'].to_list(),
            group['highestPrice'].to_list(),
            group['lowestPrice'].to_list(),
            group['closePrice'].to_list(),
            group['turnoverVol'].to_list(),
            group['turnoverValue'].to_list(),
            group['dealAmount'].to_list(),
            group['turnoverRate'].to_list(),
            group['negMarketValue'].to_list(),
            group['marketValue'].to_list(),
            group['chgPct'].to_list(),
            group['PE'].to_list(),
            group['PE1'].to_list(),
            group['PB'].to_list(),
            group['isOpen'].to_list(),
            group['vwap'].to_list()
        )
        h_i_list.append(one_node)
        pass

    try:
        cur.executemany(sql_vertical_i_str,v_i_list)
        cur.executemany(sql_vertical_u_str,v_u_list)
        cur.executemany(sql_horizonal_i_str,h_i_list)
        conn.commit()
        pass
    except Exception as e:
        print(f'error: {e}')
        conn.rollback()
    finally:
        cur.close()
        conn.close()
    pass


if __name__ == '__main__':
    many_day_insert(r'E:/temp005/')
    pass

2.3 代码-一个交易日所有股票

def one_day_insert(file_path:str):
    '''
    1 纵向数据
    1.1 新股,insert
    1.2 非新股,update, array_append
    2 横向数据
    2.1 直接 insert 一条数据
    :param file_path:
    :return:
    '''
    df = pd.read_csv(file_path,encoding='utf-8')

    col_list = ['secID', 'tradeDate', 'openPrice', 'highestPrice', 'lowestPrice', 'closePrice', 'turnoverVol',
                'turnoverValue', 'dealAmount', 'turnoverRate', 'negMarketValue', 'marketValue', 'chgPct', 'PE', 'PE1',
                'PB', 'isOpen', 'vwap']
    df = df.loc[:,col_list].copy()
    df['ticker'] = df['secID'].str.slice(0,6)
    ticker_list = df['ticker'].to_list()
    ticker_list = list(set(ticker_list))
    ticker_list_str = '\',\''.join(ticker_list)
    ticker_list_str = '\'' + ticker_list_str + '\''

    # sql 判断 t_stock_daily 是否为空表
    sql_vertical_null_str = f"select count(*) from t_stock_daily;"
    # sql 查询 ticker
    sql_vertical_q_str = f"select ticker from t_stock_daily where ticker not in ({ticker_list_str});"

    sql_vertical_i_str = '''
        insert into t_stock_daily(ticker,tradeDate,openPrice,highestPrice,lowestPrice,closePrice,turnoverVol,turnoverValue,dealAmount,turnoverRate,negMarketValue,marketValue,chgPct,PE,PE1,PB,isOpen,vwap) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);
        '''

    sql_vertical_u_str = '''
            update t_stock_daily set tradeDate=array_append(tradeDate,%s),openPrice=array_append(openPrice,%s),highestPrice=array_append(highestPrice,%s),lowestPrice=array_append(lowestPrice,%s),closePrice=array_append(closePrice,%s),turnoverVol=array_append(turnoverVol,%s),turnoverValue=array_append(turnoverValue,%s),dealAmount=array_append(dealAmount,%s),turnoverRate=array_append(turnoverRate,%s),negMarketValue=array_append(negMarketValue,%s),marketValue=array_append(marketValue,%s),chgPct=array_append(chgPct,%s),PE=array_append(PE,%s),PE1=array_append(PE1,%s),PB=array_append(PB,%s),isOpen=array_append(isOpen,%s),vwap=array_append(vwap,%s) where ticker=%s;
            '''

    sql_horizonal_i_str = '''
        insert into t_daily(tradeDate,tradeDateOj,ticker,openPrice,highestPrice,lowestPrice,closePrice,turnoverVol,turnoverValue,dealAmount,turnoverRate,negMarketValue,marketValue,chgPct,PE,PE1,PB,isOpen,vwap) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);
        '''

    conn = connect_db()
    cur = conn.cursor()
    cur.execute(sql_vertical_null_str)
    res0 = cur.fetchone()
    if res0[0] == 0:
        new_ticker_list = ticker_list
    else:
        cur.execute(sql_vertical_q_str)
        res = cur.fetchall()
        new_ticker_list = []
        if res is not None:
            for one_node in res:
                new_ticker_list.append(one_node[0])

    v_i_list = []
    v_u_list = []
    h_one = None
    for i,row in df.iterrows():
        if row['ticker'] in new_ticker_list:
            one_node = (
                row['ticker'],
                [row['tradeDate']],
                [row['openPrice']],
                [row['highestPrice']],
                [row['lowestPrice']],
                [row['closePrice']],
                [row['turnoverVol']],
                [row['turnoverValue']],
                [row['dealAmount']],
                [row['turnoverRate']],
                [row['negMarketValue']],
                [row['marketValue']],
                [row['chgPct']],
                [row['PE']],
                [row['PE1']],
                [row['PB']],
                [row['isOpen']],
                [row['vwap']]
            )
            v_i_list.append(one_node)
            pass
        else:
            one_node = (
                row['tradeDate'],
                row['openPrice'],
                row['highestPrice'],
                row['lowestPrice'],
                row['closePrice'],
                row['turnoverVol'],
                row['turnoverValue'],
                row['dealAmount'],
                row['turnoverRate'],
                row['negMarketValue'],
                row['marketValue'],
                row['chgPct'],
                row['PE'],
                row['PE1'],
                row['PB'],
                row['isOpen'],
                row['vwap'],
                row['ticker']
            )
            v_u_list.append(one_node)
            pass
        pass
    date_str = df['tradeDate'][0]
    h_one = (
        date_str,
        datetime.strptime(date_str,'%Y-%m-%d'),
        df['ticker'].to_list(),
        df['openPrice'].to_list(),
        df['highestPrice'].to_list(),
        df['lowestPrice'].to_list(),
        df['closePrice'].to_list(),
        df['turnoverVol'].to_list(),
        df['turnoverValue'].to_list(),
        df['dealAmount'].to_list(),
        df['turnoverRate'].to_list(),
        df['negMarketValue'].to_list(),
        df['marketValue'].to_list(),
        df['chgPct'].to_list(),
        df['PE'].to_list(),
        df['PE1'].to_list(),
        df['PB'].to_list(),
        df['isOpen'].to_list(),
        df['vwap'].to_list()
    )

    try:
        cur.executemany(sql_vertical_i_str,v_i_list)
        cur.executemany(sql_vertical_u_str,v_u_list)
        cur.execute(sql_horizonal_i_str,h_one)
        conn.commit()
        pass
    except Exception as e:
        print(f'error: {e}')
        conn.rollback()
    finally:
        cur.close()
        conn.close()
    pass

2.4 在 pgAdmin4 中查看数据存储是否正确

上面的操作 存储的是  2023年7月11日 到 2025年2月13日 数据

1)t_daily

查询 t_daily 中最小日期和最大日期

select max(tradeDateOj) from t_daily;
select min(tradeDateOj) from t_daily;

 2) t_stock_daily

查看某一股票的第一条数据和最后一条数据

select tradeDate[1],openPrice[1],highestPrice[1],lowestPrice[1],closePrice[1],turnoverVol[1],turnoverValue[1],dealAmount[1],turnoverRate[1],negMarketValue[1],marketValue[1],chgPct[1],PE[1],PE1[1],PB[1],isOpen[1],vwap[1] from t_stock_daily where ticker='000001';

 select tradeDate[array_length(tradeDate,1)],openPrice[array_length(openPrice,1)],highestPrice[array_length(highestPrice,1)],lowestPrice[array_length(lowestPrice,1)],closePrice[array_length(closePrice,1)],turnoverVol[array_length(turnoverVol,1)],turnoverValue[array_length(turnoverValue,1)],dealAmount[array_length(dealAmount,1)],turnoverRate[array_length(turnoverRate,1)],negMarketValue[array_length(negMarketValue,1)],marketValue[array_length(marketValue,1)],chgPct[array_length(chgPct,1)],PE[array_length(PE,1)],PE1[array_length(PE1,1)],PB[array_length(PB,1)],isOpen[array_length(isOpen,1)],vwap[array_length(vwap,1)] from t_stock_daily where ticker='000001';