Python写百万条记录到PostgreSQL数据库的方法哪种最快?

Python对PostgreSQL数据库进行操作,经常会用到第三方工具psycopg2,其中插入或更新数据表的几种方法速度差别比较大,本文对几种常用的方法做了比较,内容从程序化股票交易系统中提取而来。在程序化股票交易系统中,除了开盘前读入必要的历史数据之外,还需要在开盘期间保存全部股票的实时行情,以便在系统出现故障或断电的情况下,快速恢复系统运行。

A股全市场共有5000多只股票,程序化交易系统使用的数据有日线数据、30分钟线数据、5分钟线数据和1分钟线数据,这些数据每分钟都要更新一次,那么每分钟更新的数据大约要保存或更新20000多条数据,这需要在10秒钟内完成,因为整个系统每分钟运行一次,除了处理行情数据之外,还要有足够的时间进行策略计算和生成交易信号,另外还要个系统运行留出大约10秒中的余量。因此,以最快的速度处理并保存全部股票的实时行情数据尤为重要。

Python写百万条记录到PostgreSQL数据库的方法哪种最快?

本文定义了以下六种方法,把psycopg2提供的插入数据到数据库的方法封装起来,比较这些方法的执行速度,选出其中最快的方法在系统中使用。六种方法分别介绍如下:

  • 导入必要的工具包:
import numpy as np
import pandas as pd
import psycopg2
import psycopg2.extras as extras
import sys
import os
from io import StringIO,BytesIO
import time
from sqlalchemy import create_engine
  • 在PostgreSQL数据库中创建数据表stockmd_mine, 代码如下:
CREATE TABLE IF NOT EXISTS public.stockmd_min1 (
	  id integer,	
  	symbol TEXT,
  	dt_occured TIMESTAMP,  
  	open_p NUMERIC,
  	high_p NUMERIC,
  	low_p NUMERIC,
  	close_p NUMERIC,
  	volume_p NUMERIC,
  	amnt_p NUMERIC	  
);
  • 定义PostgreSQL数据库连接
param_dic = {
    "host"      : "localhost",
    "database"  : "tutorial",
    "user"      : "postgres",
    "password"  : "caspar"
}

def connect(params_dic):    
    conn = None
    try:        
        print(' 连接到 PostgreSQL 数据库...')
        conn = psycopg2.connect(**params_dic)
    except(Exception, psycopg2.DatabaseError) as error:
        print(error)
        sys.exit(1)
    print(" 连接成功!")
    return conn
  • 准备测试数据:把股票的1分钟行情数据从对应的文本文件中读取1分钟行情数据
def HistoryMD_mins(finalmd_filelist,file_path):    
    # 基于finalmd_list读入1分钟历史数据
    print('\n 正在读取 1 分钟历史数据...')
    dfmin1_list_tmp = []
    for filename_min1 in finalmd_filelist:   
        try:  
            headers = ['symbol', 'dt_occured', 'open_p', 'high_p','low_p','close_p','volume_p','amnt_p'] 
            dtypes = {'symbol':'str', 'dt_occured':'str', 'open_p':'float', 'high_p':'float','low_p':'float','close_p':'float','volume_p':'int','amnt_p':'float'}
            parse_dates = ['dt_occured']  # 字符型解析为日期型       
            df = pd.read_csv(file_path+filename_min1, sep=',', header=None, names=headers, dtype=dtypes, parse_dates=parse_dates)         
            df['dt_occured']= pd.to_datetime(df['dt_occured'],format='%Y-%m-%d %H:%M')            
            if (len(df) > 0):            
                #     df_cut = df[(len(df)-10800):]
                #     df_cut.set_index('dt_occured',inplace= True)   
                #     dfmin1_list_tmp.append(df_cut)         
                # else:      
                #     #print(f'{filename_min1} 的长度:{len(df)}') 
                # df.set_index('dt_occured',inplace= True)        
                dfmin1_list_tmp.append(df)
            else:
                pass
        except:        
            print(' 报错 - min1:',filename_min1)
    return dfmin1_list_tmp

## 读取列表中文本文件中的1分钟行情数据,后面运行时有53只股票的数据
finalmd_filelist = ['SZ#002142.csv','SH#600111.csv','SZ#000831.csv','SZ#300748.csv',
                    'SZ#000629.csv','SH#600259.csv','SH#600507.csv','SZ#000630.csv']  
tmpdf_list= HistoryMD_mins(finalmd_filelist,"D:\\SecurityData\\MD_1min\\")
df = pd.concat(tmpdf_list)

方法1:封装execute_values()函数,用来保存行情数据到数据库

根据dataframe创建元组列表,然后把每行数据按照列顺序连接起来,再执行SQL出入语句

def execute_values(conn, df, table):   
    print(' 正在调用函数 execute_values() 写数据到数据库,请稍等...')
    # 根据dataframe创建元组列表
    tuples = [tuple(x) for x in df.to_numpy()]
    # dataframe的列
    cols = ','.join(list(df.columns))
    # execute方法
    cursor = conn.cursor()    
    query  = "INSERT INTO %s(%s) VALUES %%s" % (table, cols)    
    try:
        extras.execute_values(cursor, query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print(" 报错: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print(" 执行 execute_values() 完毕!")
    cursor.close()

方法2:封装copy_from()函数,用以保存行情数据到数据库

先把dataframe数据保存为一个临时文件,放在硬盘,然后再打开并读取数据,最后使用copy_from()函数把内容写入数据库中,

def copy_from_file(conn, df, table):
		print(' 正在调用自定义函数 copy_from_file() 写数据到数据库,请稍等...')    
    # 把dataframe数据保存为一个临时文件,放在硬盘
    tmp_df = "tmp_dataframe.csv"
    df.to_csv(tmp_df, index_label='id', header=False)
    f = open(tmp_df, 'r')    
    cursor = conn.cursor()    
    try:
        cursor.copy_from(f, table, sep=",",null ='\\N',size = 32768,columns = None )
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        # os.remove(tmp_df)
        print(" 报错: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print(" 执行 copy_from_file() 完毕!")
    cursor.close()
    # os.remove(tmp_df)  

方法3:封装copy_from()函数,使用StringIO方法读取数据

先把dataframe数据保存到内存中,然后从内存读取数据,最后使用copy_from()函数把内容复制数据库的表中,

def copy_from_stringio(conn, df, table):    
    print(' 正在调用自定义函数 copy_from_stringio() 写数据到数据库,请稍等...')
    
    # 把dataframe保存到内存中
    buffer = StringIO()
    df.to_csv(buffer, index_label='id', header=False)
    buffer.seek(0)
    
    cursor = conn.cursor()
    try:
        cursor.copy_from(buffer, table, sep=",",null ='\\N',size = 32768,columns = None )
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        
        print(" 报错: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print(" 执行 copy_from_stringio() 完毕!")
    cursor.close()

方法4:封装execute_mogrify()函数,

使用cursor.mogrify()函数编写大批量插入数据的语句,然后由cursor.execute()执行。

print(' 正在调用自定义函数 execute_mogrify() 写数据到数据库,请稍等...')
    df.reset_index(level=0, inplace=False) #是否把索引列转换为普通列
    
    # 根据dataframe的值创建元组列表
    tuples = [tuple(x) for x in df.to_numpy()]
    # print(len(tuples))
    # print(tuples[-1])
    # dataframe的列  
    cols = ','.join(list(df.columns))    
    # print(type(cols))
    # print(df.columns)
    # SQL quert to execute
    cursor = conn.cursor()
    # values = [cursor.mogrify("(%s,%s,%s)", tup).decode('utf8') for tup in tuples]
    # query  = "INSERT INTO %s(%s) VALUES " % (table, cols) + ",".join(values)
    values = [cursor.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s)", tup).decode('utf8') for tup in tuples]
    query  = "INSERT INTO %s(%s) VALUES " % (table, cols) + ",".join(values)
    
    try:
        cursor.execute(query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print(" 报错: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print(" 执行 execute_mogrify() 完毕!")
    cursor.close()

方法5:使用sqlalchemy工具包

使用sqlalchemy工具包时,要用create_engine另外建立连接数据库的方法。连接字符串示例如:’postgresql+psycopg2://postgres:123@localhost:5432/postgres’,其中的参数如:用户名:密码@localhost:端口/数据库

def using_sqlalchemy(df,md_min1):  
    print(' 正在调用自定义函数 using_sqlalchemy() 写数据到数据库,请稍等...')
    connect4sqlalchemy = "postgresql+psycopg2://postgres:caspar@localhost:5432/tutorial"    
    engine = create_engine(connect4sqlalchemy)
    df.to_sql(
        md_min1, 
        con=engine, 
        index=True, 
        if_exists='replace'
    )
    print(" 执行 using_sqlalchemy() 完毕!")

方法6:构建超长字符串,使用execute()执行相应的插入语句

def building_string(conn, df, given_table):
    print(' 正在调用在自定义函数 building_string() 写数据到数据库,请稍等...')
    df.reset_index(level=0, inplace=True) #是否把索引列转换为普通列
    
    # 根据dataframe的值创建元组列表
    tuples = [tuple(x) for x in df.to_numpy()]
    # print(len(tuples))    
    # dataframe的所有列  
    cols = ','.join(list(df.columns))    
    # print(type(cols))
    # print(df.columns)
    
    cursor = conn.cursor()
    argument_string = ",".join("('%s','%s', '%s','%s', '%s','%s', '%s','%s','%s')" % (x, y,z,o,p,q,r,s,t) for (x, y,z,o,p,q,r,s,t) in tuples)
    cursor.execute("INSERT INTO {table} VALUES".format(table=given_table) + argument_string)
    conn.commit()
    conn.close()
    print('执行 building_string() 完毕!')

6 种方法的运行时间比较

为了更好地看出各种方法运行速度的差别,执行各方法时都读取了53只股票的 1分钟线数据,共1017600条记录,个股的数据格式如下:

Python写百万条记录到PostgreSQL数据库的方法哪种最快?

执行结果如下:

  • 执行方法1:execute_values(conn, df, ‘stockmd_min1’),结果如下图:
Python写百万条记录到PostgreSQL数据库的方法哪种最快?

  • 执行方法2:copy_from_file(conn, df, ‘stockmd_min1’),结果如下图:
Python写百万条记录到PostgreSQL数据库的方法哪种最快?

  • 执行方法3:copy_from_stringio(conn, df, ‘stockmd_min1’),结果如下图:
Python写百万条记录到PostgreSQL数据库的方法哪种最快?

  • 执行方法4:execute_mogrify(conn, df, ‘stockmd_min1’),结果如下图:
Python写百万条记录到PostgreSQL数据库的方法哪种最快?

  • 执行方法5:using_sqlalchemy(df,’stockmd_min1′),结果如下图:
Python写百万条记录到PostgreSQL数据库的方法哪种最快?

  • 执行方法6:building_string(conn, df, ‘stockmd_min1’),结果如下图:
Python写百万条记录到PostgreSQL数据库的方法哪种最快?

执行第1—6方法实际耗时约为82、28、29、88、102、50秒,处理的数据都为1017600条,而笔者开发的程序化交易系统,每分钟要更新的行情数据记录大约20000条,只需要五十分之一的时间,因此第2、第3、第6种方法耗时都不超过1秒,都能满足需要。

附:完整代码如下

import numpy as np
import pandas as pd
import psycopg2
import psycopg2.extras as extras
import sys
import os
from io import StringIO,BytesIO
import time
from sqlalchemy import create_engine

# 数据库连接参数
param_dic = {
    "host"      : "localhost",
    "database"  : "tutorial",
    "user"      : "postgres",
    "password"  : "caspar"
}

def connect(params_dic):
    conn = None
    try:        
        print(' 连接到 PostgreSQL 数据库...')
        conn = psycopg2.connect(**params_dic)
    except(Exception, psycopg2.DatabaseError) as error:
        print(error)
        sys.exit(1)
    print(" 连接成功!")
    return conn

def read_dataframe(csv_file):  
    df = pd.read_csv(csv_file,index_col=0)
    df = df.rename(columns={
    "Source": "source",
    "Date": "datetime",
    "Mean": "mean_temp"
    })
    return df

def HistoryMD_mins(finalmd_filelist,file_path):    
    print('\n 正在读取 1 分钟历史数据...')
    dfmin1_list_tmp = []
    for filename_min1 in finalmd_filelist:   
        try:  
            headers = ['symbol', 'dt_occured', 'open_p', 'high_p','low_p','close_p','volume_p','amnt_p'] 
            dtypes = {'symbol':'str', 'dt_occured':'str', 'open_p':'float', 'high_p':'float','low_p':'float','close_p':'float','volume_p':'int','amnt_p':'float'}
            parse_dates = ['dt_occured']  # 字符型解析为日期型       
            df = pd.read_csv(file_path+filename_min1, sep=',', header=None, names=headers, dtype=dtypes, parse_dates=parse_dates)         
            df['dt_occured']= pd.to_datetime(df['dt_occured'],format='%Y-%m-%d %H:%M')            
            if (len(df) > 0):            
                #     df_cut = df[(len(df)-10800):]
                #     df_cut.set_index('dt_occured',inplace= True)   
                #     dfmin1_list_tmp.append(df_cut)         
                # else:      
                #     #print(f'{filename_min1} 的长度:{len(df)}') 
                # df.set_index('dt_occured',inplace= True)        
                dfmin1_list_tmp.append(df)
            else:
                pass
        except:        
            print(' 报错 - min1:',filename_min1)
    return dfmin1_list_tmp

def execute_values(conn, df, table):
    print(' 正在调用函数 execute_values() 写数据到数据库,请稍等...')
    tuples = [tuple(x) for x in df.to_numpy()]
    cols = ','.join(list(df.columns))
   
    cursor = conn.cursor()
    cursor.execute("Truncate {} Cascade;".format(table))
    query  = "INSERT INTO %s(%s) VALUES %%s" % (table, cols)    
    try:
        extras.execute_values(cursor, query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print(" 报错: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print(" 执行 execute_values() 完毕!")
    cursor.close()

def copy_from_file(conn, df, table):
    print(' 正在调用自定义函数 copy_from_file() 写数据到数据库,请稍等...')
    
    tmp_df = "tmp_dataframe.csv"
    df.to_csv(tmp_df, index_label='id', header=False)
    f = open(tmp_df, 'r')    
    cursor = conn.cursor()
    cursor.execute("Truncate {} Cascade;".format(table))
    try:
        cursor.copy_from(f, table, sep=",",null ='\\N',size = 32768,columns = None )
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        # os.remove(tmp_df)
        print(" 报错: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print(" 执行 copy_from_file() 完毕!")
    cursor.close()
    # os.remove(tmp_df) # 本行不注释会报错

def copy_from_stringio(conn, df, table):   
    print(' 正在调用自定义函数 copy_from_stringio() 写数据到数据库,请稍等...')
   
    buffer = StringIO()
    df.to_csv(buffer, index_label='id', header=False)
    buffer.seek(0)
    
    cursor = conn.cursor()
    try:
        cursor.copy_from(buffer, table, sep=",",null ='\\N',size = 32768,columns = None )
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        
        print(" 报错: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print(" 执行 copy_from_stringio() 完毕!")
    cursor.close()


def execute_mogrify(conn, df, table):    
    print(' 正在调用自定义函数 execute_mogrify() 写数据到数据库,请稍等...')
    df.reset_index(level=0, inplace=False) #是否把索引列转换为普通列
    
    tuples = [tuple(x) for x in df.to_numpy()]    
    cols = ','.join(list(df.columns))    
   
    cursor = conn.cursor()    
    values = [cursor.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s)", tup).decode('utf8') for tup in tuples]
    query  = "INSERT INTO %s(%s) VALUES " % (table, cols) + ",".join(values)
    
    try:
        cursor.execute(query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print(" 报错: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print(" 执行 execute_mogrify() 完毕!")
    cursor.close()

def using_sqlalchemy(df,md_min1):    
    print(' 正在调用自定义函数 using_sqlalchemy() 写数据到数据库,请稍等...')
    connect4sqlalchemy = "postgresql+psycopg2://postgres:caspar@localhost:5432/tutorial"
    
    engine = create_engine(connect4sqlalchemy)
    df.to_sql(
        md_min1, 
        con=engine, 
        index=True, 
        if_exists='replace'
    )
    print(" 执行 using_sqlalchemy() 完毕!")


def building_string(conn, df, given_table):
    print(' 正在调用在自定义函数 building_string() 写数据到数据库,请稍等...')
    df.reset_index(level=0, inplace=True) #是否把索引列转换为普通列
    
    tuples = [tuple(x) for x in df.to_numpy()]
    cols = ','.join(list(df.columns))    
    cursor = conn.cursor()
    argument_string = ",".join("('%s','%s', '%s','%s', '%s','%s', '%s','%s','%s')" % (x, y,z,o,p,q,r,s,t) for (x, y,z,o,p,q,r,s,t) in tuples)
    cursor.execute("INSERT INTO {table} VALUES".format(table=given_table) + argument_string)
    conn.commit()
    conn.close()
    print('执行 building_string() 完毕!')

if __name__ == "__main__":   
    T0_start = time.perf_counter() 
    ## df2: 读取1分钟行情数据
    finalmd_filelist = ['SZ#002142.csv','SH#600036.csv','SH#601318.csv','SH#601166.csv','SH#600919.csv','SH#601128.csv',
                        'SH#601838.csv','SZ#000001.csv','SH#601658.csv','SH#601788.csv','SH#600999.csv','SZ#300059.csv',
                        'SH#600030.csv','SH#601688.csv','SH#600837.csv','SH#601211.csv','SH#601377.csv','SZ#000776.csv',
                        'SZ#300003.csv','SH#601016.csv','SH#603658.csv','SH#600570.csv','SZ#000983.csv','SZ#002128.csv',
                        'SZ#000791.csv','SZ#000155.csv','SH#600900.csv','SZ#002039.csv','SH#601899.csv','SH#600452.csv',
                        'SH#600021.csv','SH#600116.csv','SZ#000993.csv','SZ#000591.csv','SZ#000875.csv','SH#600011.csv',
                        'SH#600821.csv','SH#600163.csv','SH#601985.csv','SH#601778.csv','SZ#000966.csv','SZ#002608.csv',
                        'SH#600025.csv','SH#600236.csv','SZ#000060.csv','SH#600111.csv','SZ#000831.csv','SZ#300748.csv',
                        'SZ#000629.csv','SH#600259.csv','SH#600507.csv','SZ#000878.csv','SZ#000630.csv']  
    tmpdf_list= HistoryMD_mins(finalmd_filelist,"D:\\SecurityData\\MD_1min\\")
   
    df = pd.concat(tmpdf_list)    
    T0_end = time.perf_counter()       
    print(f'\n 读入 {len(finalmd_filelist)} 个股票 1 分钟行情数据的文本文件,耗时: {round(T0_end-T0_start,2)} 秒\n')  

    conn = connect(param_dic)
    
    ## 分别运行6种方法
    # execute_values(conn, df, 'stockmd_min1') 
    # copy_from_file(conn, df, 'stockmd_min1')    
    # copy_from_stringio(conn, df, 'stockmd_min1')  
    execute_mogrify(conn, df, 'stockmd_min1')   
    # using_sqlalchemy(df,'stockmd_min1')   
    # building_string(conn, df, 'stockmd_min1')
  
    conn.close()

    T1_end = time.perf_counter()
        
    print(f'\n 写入 {len(df)} 条行情数据到数据库,耗时: {round(T1_end-T0_end,2)} 秒\n')  

发布者:股市刺客,转载请注明出处:https://www.95sca.cn/archives/76187
站内所有文章皆来自网络转载或读者投稿,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。敬请谅解!

(0)
股市刺客的头像股市刺客
上一篇 2024 年 7 月 12 日
下一篇 2024 年 7 月 12 日

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注