psycopg2:通过一个查询插入多行

问题:psycopg2:通过一个查询插入多行

我需要用一个查询插入多行(行数不是常数),所以我需要像这样执行查询:

INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6);

我知道的唯一方法是

args = [(1,2), (3,4), (5,6)]
args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args)
cursor.execute("INSERT INTO t (a, b) VALUES "+args_str)

但我想要一些更简单的方法。

I need to insert multiple rows with one query (number of rows is not constant), so I need to execute query like this one:

INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6);

The only way I know is

args = [(1,2), (3,4), (5,6)]
args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args)
cursor.execute("INSERT INTO t (a, b) VALUES "+args_str)

but I want some simpler way.


回答 0

我构建了一个程序,该程序将多行插入到位于另一个城市的服务器上。

我发现使用这种方法的速度大约是10倍executemany。就我而言,tup是一个包含约2000行的元组。使用此方法大约花了10秒钟:

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str) 

使用此方法需要2分钟:

cur.executemany("INSERT INTO table VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)", tup)

I built a program that inserts multiple lines to a server that was located in another city.

I found out that using this method was about 10 times faster than executemany. In my case tup is a tuple containing about 2000 rows. It took about 10 seconds when using this method:

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str) 

and 2 minutes when using this method:

cur.executemany("INSERT INTO table VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)", tup)

回答 1

Psycopg 2.7中的新execute_values方法

data = [(1,'x'), (2,'y')]
insert_query = 'insert into t (a, b) values %s'
psycopg2.extras.execute_values (
    cursor, insert_query, data, template=None, page_size=100
)

在Psycopg 2.6中执行此操作的pythonic方法:

data = [(1,'x'), (2,'y')]
records_list_template = ','.join(['%s'] * len(data))
insert_query = 'insert into t (a, b) values {}'.format(records_list_template)
cursor.execute(insert_query, data)

说明:如果要插入的数据作为元组列表给出,例如

data = [(1,'x'), (2,'y')]

那么它已经是确切要求的格式了

  1. 该子句的values语法insert需要一个记录列表,如

    insert into t (a, b) values (1, 'x'),(2, 'y')

  2. Psycopg使Python适应tuplePostgresql record

唯一必要的工作是提供一个由psycopg填充的记录列表模板

# We use the data list to be sure of the template length
records_list_template = ','.join(['%s'] * len(data))

并将其放在insert查询中

insert_query = 'insert into t (a, b) values {}'.format(records_list_template)

打印insert_query输出

insert into t (a, b) values %s,%s

现在到通常的Psycopg参数替换

cursor.execute(insert_query, data)

或者只是测试将要发送到服务器的内容

print (cursor.mogrify(insert_query, data).decode('utf8'))

输出:

insert into t (a, b) values (1, 'x'),(2, 'y')

New execute_values method in Psycopg 2.7:

data = [(1,'x'), (2,'y')]
insert_query = 'insert into t (a, b) values %s'
psycopg2.extras.execute_values (
    cursor, insert_query, data, template=None, page_size=100
)

The pythonic way of doing it in Psycopg 2.6:

data = [(1,'x'), (2,'y')]
records_list_template = ','.join(['%s'] * len(data))
insert_query = 'insert into t (a, b) values {}'.format(records_list_template)
cursor.execute(insert_query, data)

Explanation: If the data to be inserted is given as a list of tuples like in

data = [(1,'x'), (2,'y')]

then it is already in the exact required format as

  1. the values syntax of the insert clause expects a list of records as in

    insert into t (a, b) values (1, 'x'),(2, 'y')

  2. Psycopg adapts a Python tuple to a Postgresql record.

The only necessary work is to provide a records list template to be filled by psycopg

# We use the data list to be sure of the template length
records_list_template = ','.join(['%s'] * len(data))

and place it in the insert query

insert_query = 'insert into t (a, b) values {}'.format(records_list_template)

Printing the insert_query outputs

insert into t (a, b) values %s,%s

Now to the usual Psycopg arguments substitution

cursor.execute(insert_query, data)

Or just testing what will be sent to the server

print (cursor.mogrify(insert_query, data).decode('utf8'))

Output:

insert into t (a, b) values (1, 'x'),(2, 'y')

回答 2

使用psycopg2 2.7更新:

executemany()以下线程中所述,经典版本比@ ant32的实现(称为“折叠”)慢约60倍:https ://www.postgresql.org/message-id/20170130215151.GA7081%40deb76.aryehleib.com

此实现已在2.7版中添加到psycopg2中,称为execute_values()

from psycopg2.extras import execute_values
execute_values(cur,
    "INSERT INTO test (id, v1, v2) VALUES %s",
    [(1, 2, 3), (4, 5, 6), (7, 8, 9)])

上一个答案:

要插入多行,使用multirow VALUES语法execute()比使用psycopg2快约10倍executemany()。确实,executemany()只是运行许多单独的INSERT语句。

@ ant32的代码在Python 2中可以完美地工作。但是在Python 3中,cursor.mogrify()返回字节,cursor.execute()采用字节或字符串并','.join()需要str实例。

因此,在Python 3中,您可能需要通过添加.decode('utf-8')以下内容来修改@ ant32的代码:

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x).decode('utf-8') for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str)

或仅使用字节(带有b''b""):

args_bytes = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute(b"INSERT INTO table VALUES " + args_bytes) 

Update with psycopg2 2.7:

The classic executemany() is about 60 times slower than @ant32 ‘s implementation (called “folded”) as explained in this thread: https://www.postgresql.org/message-id/20170130215151.GA7081%40deb76.aryehleib.com

This implementation was added to psycopg2 in version 2.7 and is called execute_values():

from psycopg2.extras import execute_values
execute_values(cur,
    "INSERT INTO test (id, v1, v2) VALUES %s",
    [(1, 2, 3), (4, 5, 6), (7, 8, 9)])

Previous Answer:

To insert multiple rows, using the multirow VALUES syntax with execute() is about 10x faster than using psycopg2 executemany(). Indeed, executemany() just runs many individual INSERT statements.

@ant32 ‘s code works perfectly in Python 2. But in Python 3, cursor.mogrify() returns bytes, cursor.execute() takes either bytes or strings, and ','.join() expects str instance.

So in Python 3 you may need to modify @ant32 ‘s code, by adding .decode('utf-8'):

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x).decode('utf-8') for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str)

Or by using bytes (with b'' or b"") only:

args_bytes = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute(b"INSERT INTO table VALUES " + args_bytes) 

回答 3

cursor.copy_from是迄今为止我发现的用于批量插入的最快解决方案。这是编写的要点,其中包含一个名为IteratorFile的类,该类允许迭代器产生像文件一样读取字符串。我们可以使用生成器表达式将每个输入记录转换为字符串。所以解决方案是

args = [(1,2), (3,4), (5,6)]
f = IteratorFile(("{}\t{}".format(x[0], x[1]) for x in args))
cursor.copy_from(f, 'table_name', columns=('a', 'b'))

对于这个琐碎的args来说,速度差异不会太大,但是在处理数千行时,我看到了很大的加速。与构建巨大的查询字符串相比,它还将提高内存效率。迭代器一次只能在内存中保存一个输入记录,在某个时候,通过构建查询字符串,您的Python进程或Postgres中的内存将用完。

cursor.copy_from is the fastest solution I’ve found for bulk inserts by far. Here’s a gist I made containing a class named IteratorFile which allows an iterator yielding strings to be read like a file. We can convert each input record to a string using a generator expression. So the solution would be

args = [(1,2), (3,4), (5,6)]
f = IteratorFile(("{}\t{}".format(x[0], x[1]) for x in args))
cursor.copy_from(f, 'table_name', columns=('a', 'b'))

For this trivial size of args it won’t make much of a speed difference, but I see big speedups when dealing with thousands+ of rows. It will also be more memory efficient than building a giant query string. An iterator would only ever hold one input record in memory at a time, where at some point you’ll run out of memory in your Python process or in Postgres by building the query string.


回答 4

来自Postgresql.org的 Psycopg2的教程页面的摘要(请参见底部)

我想向您展示的最后一项是如何使用字典插入多行。如果您具有以下条件:

namedict = ({"first_name":"Joshua", "last_name":"Drake"},
            {"first_name":"Steven", "last_name":"Foo"},
            {"first_name":"David", "last_name":"Bar"})

您可以使用以下命令轻松地将所有三行插入字典中:

cur = conn.cursor()
cur.executemany("""INSERT INTO bar(first_name,last_name) VALUES (%(first_name)s, %(last_name)s)""", namedict)

它不会节省太多代码,但是绝对可以看起来更好。

A snippet from Psycopg2’s tutorial page at Postgresql.org (see bottom):

A last item I would like to show you is how to insert multiple rows using a dictionary. If you had the following:

namedict = ({"first_name":"Joshua", "last_name":"Drake"},
            {"first_name":"Steven", "last_name":"Foo"},
            {"first_name":"David", "last_name":"Bar"})

You could easily insert all three rows within the dictionary by using:

cur = conn.cursor()
cur.executemany("""INSERT INTO bar(first_name,last_name) VALUES (%(first_name)s, %(last_name)s)""", namedict)

It doesn’t save much code, but it definitively looks better.


回答 5

所有这些技术在Postgres术语中都称为“扩展插入”,截至2016年11月24日,它仍然比psychopg2的executemany()和该线程中列出的所有其他方法快了很多(我在尝试此方法之前曾尝试过)回答)。

这是一些不使用cur.mogrify的代码,它很不错,只是可以帮助您:

valueSQL = [ '%s', '%s', '%s', ... ] # as many as you have columns.
sqlrows = []
rowsPerInsert = 3 # more means faster, but with diminishing returns..
for row in getSomeData:
        # row == [1, 'a', 'yolo', ... ]
        sqlrows += row
        if ( len(sqlrows)/len(valueSQL) ) % rowsPerInsert == 0:
                # sqlrows == [ 1, 'a', 'yolo', 2, 'b', 'swag', 3, 'c', 'selfie' ]
                insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*rowsPerInsert)
                cur.execute(insertSQL, sqlrows)
                con.commit()
                sqlrows = []
insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*len(sqlrows))
cur.execute(insertSQL, sqlrows)
con.commit()

但应注意,如果可以使用copy_from(),则应该使用copy_from;)

All of these techniques are called ‘Extended Inserts” in Postgres terminology, and as of the 24th of November 2016, it’s still a ton faster than psychopg2’s executemany() and all the other methods listed in this thread (which i tried before coming to this answer).

Here’s some code which doesnt use cur.mogrify and is nice and simply to get your head around:

valueSQL = [ '%s', '%s', '%s', ... ] # as many as you have columns.
sqlrows = []
rowsPerInsert = 3 # more means faster, but with diminishing returns..
for row in getSomeData:
        # row == [1, 'a', 'yolo', ... ]
        sqlrows += row
        if ( len(sqlrows)/len(valueSQL) ) % rowsPerInsert == 0:
                # sqlrows == [ 1, 'a', 'yolo', 2, 'b', 'swag', 3, 'c', 'selfie' ]
                insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*rowsPerInsert)
                cur.execute(insertSQL, sqlrows)
                con.commit()
                sqlrows = []
insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*len(sqlrows))
cur.execute(insertSQL, sqlrows)
con.commit()

But it should be noted that if you can use copy_from(), you should use copy_from ;)


回答 6

几年来我一直在使用ant32的答案。但是我发现这在python 3中解决了一个错误,因为它mogrify返回了一个字节字符串。

显式转换为bytse字符串是使代码与python 3兼容的简单解决方案。

args_str = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup) 
cur.execute(b"INSERT INTO table VALUES " + args_str)

I’ve been using ant32’s answer above for several years. However I’ve found that is thorws an error in python 3 because mogrify returns a byte string.

Converting explicitly to bytse strings is a simple solution for making code python 3 compatible.

args_str = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup) 
cur.execute(b"INSERT INTO table VALUES " + args_str)

回答 7

另一种有效的好方法-将要插入的行作为1个参数传递,它是json对象的数组。

例如,您传递的参数:

[ {id: 18, score: 1}, { id: 19, score: 5} ]

它是数组,其中可以包含任意数量的对象。然后您的SQL看起来像:

INSERT INTO links (parent_id, child_id, score) 
SELECT 123, (r->>'id')::int, (r->>'score')::int 
FROM unnest($1::json[]) as r 

注意:您的postgress必须足够新,才能支持json

Another nice and efficient approach – is to pass rows for insertion as 1 argument, which is array of json objects.

E.g. you passing argument:

[ {id: 18, score: 1}, { id: 19, score: 5} ]

It is array, which may contain any amount of objects inside. Then your SQL looks like:

INSERT INTO links (parent_id, child_id, score) 
SELECT 123, (r->>'id')::int, (r->>'score')::int 
FROM unnest($1::json[]) as r 

Notice: Your postgress must be new enough, to support json


回答 8

上面(https://stackoverflow.com/a/30721460/11100064)的@ jopseph.sheedyhttps://stackoverflow.com/users/958118/joseph-sheedy)提供的cursor.copyfrom解决方案确实快如闪电。

但是,他给出的示例不适用于具有任意多个字段的记录,因此我花了一些时间才弄清楚如何正确使用它。

IteratorFile需要使用这样的制表符分隔的字段实例化(r是一个字典列表,其中每个字典是一条记录):

    f = IteratorFile("{0}\t{1}\t{2}\t{3}\t{4}".format(r["id"],
        r["type"],
        r["item"],
        r["month"],
        r["revenue"]) for r in records)

为了概括任意数量的字段,我们将首先创建一个带有正确数量的制表符和字段占位符的行字符串:"{}\t{}\t{}....\t{}"然后使用.format()来填充字段值 *list(r.values())) for r in records::

        line = "\t".join(["{}"] * len(records[0]))

        f = IteratorFile(line.format(*list(r.values())) for r in records)

完整的功能在这里

The cursor.copyfrom solution as provided by @jopseph.sheedy (https://stackoverflow.com/users/958118/joseph-sheedy) above (https://stackoverflow.com/a/30721460/11100064) is indeed lightning fast.

However, the example he gives are not generically usable for a record with any number of fields and it took me while to figure out how to use it correctly.

The IteratorFile needs to be instantiated with tab-separated fields like this (r is a list of dicts where each dict is a record):

    f = IteratorFile("{0}\t{1}\t{2}\t{3}\t{4}".format(r["id"],
        r["type"],
        r["item"],
        r["month"],
        r["revenue"]) for r in records)

To generalise for an arbitrary number of fields we will first create a line string with the correct amount of tabs and field placeholders : "{}\t{}\t{}....\t{}" and then use .format() to fill in the field values : *list(r.values())) for r in records:

        line = "\t".join(["{}"] * len(records[0]))

        f = IteratorFile(line.format(*list(r.values())) for r in records)

complete function in gist here.


回答 9

如果您使用的是SQLAlchemy,则无需手工处理字符串,因为SQLAlchemy 支持VALUES为单个INSERT语句生成多行子句

rows = []
for i, name in enumerate(rawdata):
    row = {
        'id': i,
        'name': name,
        'valid': True,
    }
    rows.append(row)
if len(rows) > 0:  # INSERT fails if no rows
    insert_query = SQLAlchemyModelName.__table__.insert().values(rows)
    session.execute(insert_query)

If you’re using SQLAlchemy, you don’t need to mess with hand-crafting the string because SQLAlchemy supports generating a multi-row VALUES clause for a single INSERT statement:

rows = []
for i, name in enumerate(rawdata):
    row = {
        'id': i,
        'name': name,
        'valid': True,
    }
    rows.append(row)
if len(rows) > 0:  # INSERT fails if no rows
    insert_query = SQLAlchemyModelName.__table__.insert().values(rows)
    session.execute(insert_query)

回答 10

自发布此问题以来,execute_batch已添加到psycopg2。

它比execute_values慢,但使用起来更简单。

execute_batch has been added to psycopg2 since this question was posted.

It is slower than execute_values but simpler to use.


回答 11

executemany接受元组数组

https://www.postgresqltutorial.com/postgresql-python/insert/

    """ array of tuples """
    vendor_list = [(value1,)]

    """ insert multiple vendors into the vendors table  """
    sql = "INSERT INTO vendors(vendor_name) VALUES(%s)"
    conn = None
    try:
        # read database configuration
        params = config()
        # connect to the PostgreSQL database
        conn = psycopg2.connect(**params)
        # create a new cursor
        cur = conn.cursor()
        # execute the INSERT statement
        cur.executemany(sql,vendor_list)
        # commit the changes to the database
        conn.commit()
        # close communication with the database
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

executemany accept array of tuples

https://www.postgresqltutorial.com/postgresql-python/insert/

    """ array of tuples """
    vendor_list = [(value1,)]

    """ insert multiple vendors into the vendors table  """
    sql = "INSERT INTO vendors(vendor_name) VALUES(%s)"
    conn = None
    try:
        # read database configuration
        params = config()
        # connect to the PostgreSQL database
        conn = psycopg2.connect(**params)
        # create a new cursor
        cur = conn.cursor()
        # execute the INSERT statement
        cur.executemany(sql,vendor_list)
        # commit the changes to the database
        conn.commit()
        # close communication with the database
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

回答 12

如果要在一个插入状态表中插入多行(假设您未使用ORM),那么到目前为止,对我来说最简单的方法是使用词典列表。这是一个例子:

 t = [{'id':1, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 6},
      {'id':2, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 7},
      {'id':3, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 8}]

conn.execute("insert into campaign_dates
             (id, start_date, end_date, campaignid) 
              values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);",
             t)

如您所见,将仅执行一个查询:

INFO sqlalchemy.engine.base.Engine insert into campaign_dates (id, start_date, end_date, campaignid) values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);
INFO sqlalchemy.engine.base.Engine [{'campaignid': 6, 'id': 1, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 7, 'id': 2, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 8, 'id': 3, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}]
INFO sqlalchemy.engine.base.Engine COMMIT

If you want to insert multiple rows within one insert statemens (assuming you are not using ORM) the easiest way so far for me would be to use list of dictionaries. Here is an example:

 t = [{'id':1, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 6},
      {'id':2, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 7},
      {'id':3, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 8}]

conn.execute("insert into campaign_dates
             (id, start_date, end_date, campaignid) 
              values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);",
             t)

As you can see only one query will be executed:

INFO sqlalchemy.engine.base.Engine insert into campaign_dates (id, start_date, end_date, campaignid) values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);
INFO sqlalchemy.engine.base.Engine [{'campaignid': 6, 'id': 1, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 7, 'id': 2, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 8, 'id': 3, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}]
INFO sqlalchemy.engine.base.Engine COMMIT

回答 13

使用aiopg-下面的代码片段效果很好

    # items = [10, 11, 12, 13]
    # group = 1
    tup = [(gid, pid) for pid in items]
    args_str = ",".join([str(s) for s in tup])
    # insert into group values (1, 10), (1, 11), (1, 12), (1, 13)
    yield from cur.execute("INSERT INTO group VALUES " + args_str)

Using aiopg – The snippet below works perfectly fine

    # items = [10, 11, 12, 13]
    # group = 1
    tup = [(gid, pid) for pid in items]
    args_str = ",".join([str(s) for s in tup])
    # insert into group values (1, 10), (1, 11), (1, 12), (1, 13)
    yield from cur.execute("INSERT INTO group VALUES " + args_str)

回答 14

最终,在SQLalchemy1.2版本中,此新实现被添加为在使用use_batch_mode = True初始化引擎时使用psycopg2.extras.execute_batch()而不是executemany,例如:

engine = create_engine(
    "postgresql+psycopg2://scott:tiger@host/dbname",
    use_batch_mode=True)

http://docs.sqlalchemy.org/en/latest/changelog/migration_12.html#change-4109

然后,将不得不使用SQLalchmey的人不会费心尝试sqla和psycopg2的不同组合并直接将SQL一起使用。

Finally in SQLalchemy1.2 version, this new implementation is added to use psycopg2.extras.execute_batch() instead of executemany when you initialize your engine with use_batch_mode=True like:

engine = create_engine(
    "postgresql+psycopg2://scott:tiger@host/dbname",
    use_batch_mode=True)

http://docs.sqlalchemy.org/en/latest/changelog/migration_12.html#change-4109

Then someone would have to use SQLalchmey won’t bother to try different combinations of sqla and psycopg2 and direct SQL together..