Python和PostgreSQL数据库协同工作,需要用到psycopg2工具库,这也是Python编程中最常用的接口。但是,为了适应不同的写入速度,psycopg2工具库提供了多种执行写入的方法,常用的有以下四种:
- execute()
- executemany()
- execute_batch()
- 构建个性化的字符串
下面对这四种方法的执行写数据到PostgreSQL数据表的速度做一个比较,看看插入10000条、100000条、1000000条记录,以上各种方法各自耗时多少。
CREATE TABLE IF NOT EXISTS public.test_table ( source TEXT, datetime TIMESTAMP, mean_temp NUMERIC );
自定义的计时函数:
def measure_time(func):
def time_it(*args, **kwargs):
time_started = time.time()
func(*args, **kwargs)
time_elapsed = time.time()
print("{execute} 耗时 {sec} 秒,插入 {rows} 行记录".format(execute=func.__name__,
sec=round(
time_elapsed - time_started,
4), rows=len(
kwargs.get('values'))))
return time_it
方法1:execute()
此方法是执行数据库操作的标准函数,为了插入大批量的数据行,就必须要针对数据使用循环,并且调用该方法,一行一行地把数据插入到数据库的表中。执行execute()的方法代码如下:
def method_execute(self, values):
for value in values:
self.cursor.execute("INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME), value)
self.connection.commit()
众所周知,执行循环是很慢的,插入1万条、10万条、100万条数据的执行结果如下:

可以看出,数据越多,花费的时间越长,两者基本上是线性关系。
方法2:executemany()
Psycopg2提供的executemany()方法,并不是对execute()方法的简单改进,执行executemany()的方法代码如下:
@measure_time
def method_execute_many(self, values):
self.cursor.executemany("INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME), values)
self.connection.commit()
下图是执行不同的数据量所耗费的时间,结果如下图:

可以看出,方法1与方法2耗时并没有明显的差别。
方法3:execute_batch()
这是Psycopg2提供的又一种方法,它减少了访问数据库服务器的次数,与executemany()相比,性能有了很大的改进。这种方法把许多语句拼接在一起,只受到page_size的限制(PostgreSQL中一般是8kB),执行execute_batch()的方法代码如下:
@measure_time
def method_execute_batch(self, values):
psycopg2.extras.execute_batch(self.cursor, "INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME),
values)
self.connection.commit()
插入不同长度的数据,耗时结果如下图:

可以看出,这种方法的执行速度,比execute()方法和executemany()方法提高了将近4倍!
方法4:构建个性化的字符串
这种方法是构建一个个性化的字符串,然后用一条execute()语句来执行,代码如下:
@measure_time
def method_string_building(self, values):
argument_string = ",".join("('%s', '%s')" % (x, y) for (x, y) in values)
self.cursor.execute("INSERT INTO {table} VALUES".format(table=TABLE_NAME) + argument_string)
self.connection.commit()
看看这种方法的运行时间,结果如下图:

从图中的结果可以看出,这种方法是最快的,比方法1、方法2提高了20多倍,比方法3提高了3倍!
本文完整代码如下:
import time
import psycopg2
import psycopg2.extras
TABLE_NAME = 'TestTable'
def measure_time(func):
def time_it(*args, **kwargs):
time_started = time.time()
func(*args, **kwargs)
time_elapsed = time.time()
print("{execute} 耗时 {sec} 秒,插入 {rows} 行记录".format(execute=func.__name__,
sec=round(time_elapsed - time_started,4),
rows=len(kwargs.get('values'))))
return time_it
class PsycopgTest():
def __init__(self, num_rows):
self.num_rows = num_rows
def create_dummy_data(self):
values = []
for i in range(self.num_rows):
values.append((i + 1, 'test'))
return values
def connect(self):
conn_string = "host={0} user={1} dbname={2} password={3}".format('localhost',
'postgres',
'tutorial', 'caspar')
self.connection = psycopg2.connect(conn_string)
self.cursor = self.connection.cursor()
def create_table(self):
self.cursor.execute(
"CREATE TABLE IF NOT EXISTS {table} (id INT PRIMARY KEY, NAME text)".format(table=TABLE_NAME))
self.connection.commit()
def truncate_table(self):
self.cursor.execute("TRUNCATE TABLE {table} RESTART IDENTITY".format(table=TABLE_NAME))
self.connection.commit()
@measure_time
def method_execute(self, values):
"""Loop over the dataset and insert every row separately"""
for value in values:
self.cursor.execute("INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME), value)
self.connection.commit()
@measure_time
def method_execute_many(self, values):
self.cursor.executemany("INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME), values)
self.connection.commit()
@measure_time
def method_execute_batch(self, values):
psycopg2.extras.execute_batch(self.cursor, "INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME),
values)
self.connection.commit()
@measure_time
def method_string_building(self, values):
argument_string = ",".join("('%s', '%s')" % (x, y) for (x, y) in values)
self.cursor.execute("INSERT INTO {table} VALUES".format(table=TABLE_NAME) + argument_string)
self.connection.commit()
def main():
psyco = PsycopgTest(10000)
psyco.connect()
values = psyco.create_dummy_data()
psyco.create_table()
psyco.truncate_table()
psyco.method_execute(values=values)
# psyco.method_execute_many(values=values)
# psyco.method_execute_batch(values=values)
# psyco.method_string_building(values=values)
if __name__ == '__main__':
main()
发布者:股市刺客,转载请注明出处:https://www.95sca.cn/archives/76189
站内所有文章皆来自网络转载或读者投稿,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。敬请谅解!