使用SQLAlchemy ORM批量插入

问题:使用SQLAlchemy ORM批量插入

有什么方法可以让SQLAlchemy进行批量插入,而不是插入每个对象。即

在做:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

而不是:

INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)

我刚刚将一些代码转换为使用sqlalchemy而不是原始sql,尽管现在使用它起来要好得多,但现在似乎要慢一些(最多10倍),我想知道这是否是原因。

也许我可以更有效地使用会话来改善这种情况。目前,我已经添加了一些东西,autoCommit=False并做了一个session.commit()。尽管如果在其他地方更改了数据库,这似乎会使数据过时,例如,即使我执行新查询,我仍然可以返回旧结果?

谢谢你的帮助!

Is there any way to get SQLAlchemy to do a bulk insert rather than inserting each individual object. i.e.,

doing:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

rather than:

INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)

I’ve just converted some code to use sqlalchemy rather than raw sql and although it is now much nicer to work with it seems to be slower now (up to a factor of 10), I’m wondering if this is the reason.

May be I could improve the situation using sessions more efficiently. At the moment I have autoCommit=False and do a session.commit() after I’ve added some stuff. Although this seems to cause the data to go stale if the DB is changed elsewhere, like even if I do a new query I still get old results back?

Thanks for your help!


回答 0

SQLAlchemy在版本中引入了该功能1.0.0

批量操作-SQLAlchemy文档

通过这些操作,您现在可以批量插入或更新!

例如,您可以执行以下操作:

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

在这里,将制成大量插入物。

SQLAlchemy introduced that in version 1.0.0:

Bulk operations – SQLAlchemy docs

With these operations, you can now do bulk inserts or updates!

For instance, you can do:

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

Here, a bulk insert will be made.


回答 1

sqlalchemy文档对可用于批量插入的各种技术的性能进行了总结

ORM基本上不是用于高性能批量插入的-这是SQLAlchemy除了将ORM作为一流组件之外还提供Core的全部原因。

对于快速批量插入的用例,ORM所基于的SQL生成和执行系统是Core的一部分。直接使用该系统,我们可以产生与直接使用原始数据库API相比具有竞争力的INSERT。

另外,SQLAlchemy ORM提供了Bulk Operations方法套件,该套件提供了到工作单元过程各部分的挂钩,以便发出基于ORM的自动化程度较低的Core级INSERT和UPDATE构造。

下面的示例说明了基于时间的测试,该测试针对从自动程度最高到最少的几种不同的行插入方法。使用cPython 2.7,可以观察到运行时:

classics-MacBook-Pro:sqlalchemy classic$ python test.py
SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs
SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs
SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs
SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs
sqlite3: Total time for 100000 records 0.487842082977 sec

脚本:

import time
import sqlite3

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,  create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

Base = declarative_base()
DBSession = scoped_session(sessionmaker())
engine = None


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String(255))


def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
    global engine
    engine = create_engine(dbname, echo=False)
    DBSession.remove()
    DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)


def test_sqlalchemy_orm(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer()
        customer.name = 'NAME ' + str(i)
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_pk_given(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer(id=i+1, name="NAME " + str(i))
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM pk given: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_bulk_insert(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    n1 = n
    while n1 > 0:
        n1 = n1 - 10000
        DBSession.bulk_insert_mappings(
            Customer,
            [
                dict(name="NAME " + str(i))
                for i in xrange(min(10000, n1))
            ]
        )
    DBSession.commit()
    print(
        "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )
    print(
        "SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def init_sqlite3(dbname):
    conn = sqlite3.connect(dbname)
    c = conn.cursor()
    c.execute("DROP TABLE IF EXISTS customer")
    c.execute(
        "CREATE TABLE customer (id INTEGER NOT NULL, "
        "name VARCHAR(255), PRIMARY KEY(id))")
    conn.commit()
    return conn


def test_sqlite3(n=100000, dbname='sqlite3.db'):
    conn = init_sqlite3(dbname)
    c = conn.cursor()
    t0 = time.time()
    for i in xrange(n):
        row = ('NAME ' + str(i),)
        c.execute("INSERT INTO customer (name) VALUES (?)", row)
    conn.commit()
    print(
        "sqlite3: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " sec")

if __name__ == '__main__':
    test_sqlalchemy_orm(100000)
    test_sqlalchemy_orm_pk_given(100000)
    test_sqlalchemy_orm_bulk_insert(100000)
    test_sqlalchemy_core(100000)
    test_sqlite3(100000)

The sqlalchemy docs have a writeup on the performance of various techniques that can be used for bulk inserts:

ORMs are basically not intended for high-performance bulk inserts – this is the whole reason SQLAlchemy offers the Core in addition to the ORM as a first-class component.

For the use case of fast bulk inserts, the SQL generation and execution system that the ORM builds on top of is part of the Core. Using this system directly, we can produce an INSERT that is competitive with using the raw database API directly.

Alternatively, the SQLAlchemy ORM offers the Bulk Operations suite of methods, which provide hooks into subsections of the unit of work process in order to emit Core-level INSERT and UPDATE constructs with a small degree of ORM-based automation.

The example below illustrates time-based tests for several different methods of inserting rows, going from the most automated to the least. With cPython 2.7, runtimes observed:

classics-MacBook-Pro:sqlalchemy classic$ python test.py
SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs
SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs
SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs
SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs
sqlite3: Total time for 100000 records 0.487842082977 sec

Script:

import time
import sqlite3

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,  create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

Base = declarative_base()
DBSession = scoped_session(sessionmaker())
engine = None


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String(255))


def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
    global engine
    engine = create_engine(dbname, echo=False)
    DBSession.remove()
    DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)


def test_sqlalchemy_orm(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer()
        customer.name = 'NAME ' + str(i)
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_pk_given(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer(id=i+1, name="NAME " + str(i))
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM pk given: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_bulk_insert(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    n1 = n
    while n1 > 0:
        n1 = n1 - 10000
        DBSession.bulk_insert_mappings(
            Customer,
            [
                dict(name="NAME " + str(i))
                for i in xrange(min(10000, n1))
            ]
        )
    DBSession.commit()
    print(
        "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )
    print(
        "SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def init_sqlite3(dbname):
    conn = sqlite3.connect(dbname)
    c = conn.cursor()
    c.execute("DROP TABLE IF EXISTS customer")
    c.execute(
        "CREATE TABLE customer (id INTEGER NOT NULL, "
        "name VARCHAR(255), PRIMARY KEY(id))")
    conn.commit()
    return conn


def test_sqlite3(n=100000, dbname='sqlite3.db'):
    conn = init_sqlite3(dbname)
    c = conn.cursor()
    t0 = time.time()
    for i in xrange(n):
        row = ('NAME ' + str(i),)
        c.execute("INSERT INTO customer (name) VALUES (?)", row)
    conn.commit()
    print(
        "sqlite3: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " sec")

if __name__ == '__main__':
    test_sqlalchemy_orm(100000)
    test_sqlalchemy_orm_pk_given(100000)
    test_sqlalchemy_orm_bulk_insert(100000)
    test_sqlalchemy_core(100000)
    test_sqlite3(100000)

回答 2

据我所知,没有办法让ORM发出批量插入。我认为根本原因是SQLAlchemy需要跟踪每个对象的身份(即新的主键),而大容量插入会对此产生干扰。例如,假设您的foo表包含一id列并映射到一个Foo类:

x = Foo(bar=1)
print x.id
# None
session.add(x)
session.flush()
# BEGIN
# INSERT INTO foo (bar) VALUES(1)
# COMMIT
print x.id
# 1

由于SQLAlchemy在x.id不发出另一个查询的情况下获取了该值,因此我们可以推断出它直接从该INSERT语句中获取了该值。如果不需要随后通过相同实例访问创建的对象,则可以跳过ORM层进行插入:

Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))

SQLAlchemy无法将这些新行与任何现有对象匹配,因此您必须重新查询它们以进行任何后续操作。

至于过时的数据,记住该会话没有内置的方式来了解何时在会话外更改数据库是很有帮助的。为了通过现有实例访问外部修改的数据,必须将这些实例标记为expired。默认情况下会发生这种情况session.commit(),但可以通过调用session.expire_all()或手动完成session.expire(instance)。一个例子(省略SQL):

x = Foo(bar=1)
session.add(x)
session.commit()
print x.bar
# 1
foo.update().execute(bar=42)
print x.bar
# 1
session.expire(x)
print x.bar
# 42

session.commit()expires x,因此第一个打印语句隐式打开一个新事务并重新查询x属性。如果注释掉第一个打印语句,您会注意到第二个打印语句现在会选择正确的值,因为直到更新后才会发出新查询。

从事务隔离的角度来看,这是有道理的-您只应在事务之间进行外部修改。如果这给您带来麻烦,建议您弄清或重新考虑应用程序的事务边界,而不要立即进行操作session.expire_all()

As far as I know, there is no way to get the ORM to issue bulk inserts. I believe the underlying reason is that SQLAlchemy needs to keep track of each object’s identity (i.e., new primary keys), and bulk inserts interfere with that. For example, assuming your foo table contains an id column and is mapped to a Foo class:

x = Foo(bar=1)
print x.id
# None
session.add(x)
session.flush()
# BEGIN
# INSERT INTO foo (bar) VALUES(1)
# COMMIT
print x.id
# 1

Since SQLAlchemy picked up the value for x.id without issuing another query, we can infer that it got the value directly from the INSERT statement. If you don’t need subsequent access to the created objects via the same instances, you can skip the ORM layer for your insert:

Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))

SQLAlchemy can’t match these new rows with any existing objects, so you’ll have to query them anew for any subsequent operations.

As far as stale data is concerned, it’s helpful to remember that the session has no built-in way to know when the database is changed outside of the session. In order to access externally modified data through existing instances, the instances must be marked as expired. This happens by default on session.commit(), but can be done manually by calling session.expire_all() or session.expire(instance). An example (SQL omitted):

x = Foo(bar=1)
session.add(x)
session.commit()
print x.bar
# 1
foo.update().execute(bar=42)
print x.bar
# 1
session.expire(x)
print x.bar
# 42

session.commit() expires x, so the first print statement implicitly opens a new transaction and re-queries x‘s attributes. If you comment out the first print statement, you’ll notice that the second one now picks up the correct value, because the new query isn’t emitted until after the update.

This makes sense from the point of view of transactional isolation – you should only pick up external modifications between transactions. If this is causing you trouble, I’d suggest clarifying or re-thinking your application’s transaction boundaries instead of immediately reaching for session.expire_all().


回答 3

我通常使用add_all

from app import session
from models import User

objects = [User(name="u1"), User(name="u2"), User(name="u3")]
session.add_all(objects)
session.commit()

I usually do it using add_all.

from app import session
from models import User

objects = [User(name="u1"), User(name="u2"), User(name="u3")]
session.add_all(objects)
session.commit()

回答 4

从0.8版开始,直接支持已添加到SQLAlchemy

根据docsconnection.execute(table.insert().values(data))应该可以解决问题。(请注意,这是一样的connection.execute(table.insert(), data)通过将呼叫这导致许多个别行插入executemany)。除了本地连接之外,其他任何方面的性能差异都可能很大。

Direct support was added to SQLAlchemy as of version 0.8

As per the docs, connection.execute(table.insert().values(data)) should do the trick. (Note that this is not the same as connection.execute(table.insert(), data) which results in many individual row inserts via a call to executemany). On anything but a local connection the difference in performance can be enormous.


回答 5

SQLAlchemy在版本中引入了该功能1.0.0

批量操作-SQLAlchemy文档

通过这些操作,您现在可以批量插入或更新!

例如(如果您希望简单表INSERT的开销最小),可以使用Session.bulk_insert_mappings()

loadme = [(1, 'a'),
          (2, 'b'),
          (3, 'c')]
dicts = [dict(bar=t[0], fly=t[1]) for t in loadme]

s = Session()
s.bulk_insert_mappings(Foo, dicts)
s.commit()

或者,如果需要,可以跳过loadme元组,直接将字典写进去dicts(但是我发现将所有的单词遗漏在数据之外并循环加载字典列表会更容易)。

SQLAlchemy introduced that in version 1.0.0:

Bulk operations – SQLAlchemy docs

With these operations, you can now do bulk inserts or updates!

For instance (if you want the lowest overhead for simple table INSERTs), you can use Session.bulk_insert_mappings():

loadme = [(1, 'a'),
          (2, 'b'),
          (3, 'c')]
dicts = [dict(bar=t[0], fly=t[1]) for t in loadme]

s = Session()
s.bulk_insert_mappings(Foo, dicts)
s.commit()

Or, if you want, skip the loadme tuples and write the dictionaries directly into dicts (but I find it easier to leave all the wordiness out of the data and load up a list of dictionaries in a loop).


回答 6

Piere的回答是正确的,但是一个问题是bulk_save_objects,如果您担心的话,默认情况下不会返回对象的主键。设置return_defaultsTrue可得到此行为。

文档在这里

foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')]
session.bulk_save_objects(foos, return_defaults=True)
for foo in foos:
    assert foo.id is not None
session.commit()

Piere’s answer is correct but one issue is that bulk_save_objects by default does not return the primary keys of the objects, if that is of concern to you. Set return_defaults to True to get this behavior.

The documentation is here.

foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')]
session.bulk_save_objects(foos, return_defaults=True)
for foo in foos:
    assert foo.id is not None
session.commit()

回答 7

条条大路通罗马,但其中一些横穿山脉,需要渡轮,但是如果您想快速到达那儿,只需上高速公路。


在这种情况下,高速公路将使用psycopg2execute_batch()功能。该文档说的最好:

当前的实现executemany()(使用非常慈善的轻描淡写)不是特别有效。这些功能可用于加快针对一组参数的语句的重复执行。通过减少服务器往返次数,性能可以比使用服务器好几个数量级。executemany()

在我自己的测试execute_batch()快2倍左右executemany(),并给出配置进行进一步的调整所以page_size的选项(如果你想挤进业绩的最后2-3%的驾驶者)。

如果使用SQLAlchemy,则可以通过use_batch_mode=True在实例化引擎时将其设置为参数来轻松启用相同功能。create_engine()

All Roads Lead to Rome, but some of them crosses mountains, requires ferries but if you want to get there quickly just take the motorway.


In this case the motorway is to use the execute_batch() feature of psycopg2. The documentation says it the best:

The current implementation of executemany() is (using an extremely charitable understatement) not particularly performing. These functions can be used to speed up the repeated execution of a statement against a set of parameters. By reducing the number of server roundtrips the performance can be orders of magnitude better than using executemany().

In my own test execute_batch() is approximately twice as fast as executemany(), and gives the option to configure the page_size for further tweaking (if you want to squeeze the last 2-3% of performance out of the driver).

The same feature can easily be enabled if you are using SQLAlchemy by setting use_batch_mode=True as a parameter when you instantiate the engine with create_engine()


回答 8

这是一种方法:

values = [1, 2, 3]
Foo.__table__.insert().execute([{'bar': x} for x in values])

这样插入:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

参考:SQLAlchemy FAQ包含各种提交方法的基准。

This is a way:

values = [1, 2, 3]
Foo.__table__.insert().execute([{'bar': x} for x in values])

This will insert like this:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

Reference: The SQLAlchemy FAQ includes benchmarks for various commit methods.


回答 9

到目前为止,我发现的最佳答案是在sqlalchemy文档中:

http://docs.sqlalchemy.org/en/latest/faq/performance.html#im-inserting-400-000-rows-with-the-orm-and-it-s-really-slow

有一个完整示例说明了可能的解决方案基准。

如文档所示:

bulk_save_objects不是最佳解决方案,但其性能是正确的。

就可读性而言,第二好的实现是我认为使用SQLAlchemy Core:

def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
            [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )

文档文章中提供了此功能的上下文。

The best answer I found so far was in sqlalchemy documentation:

http://docs.sqlalchemy.org/en/latest/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow

There is a complete example of a benchmark of possible solutions.

As shown in the documentation:

bulk_save_objects is not the best solution but it performance are correct.

The second best implementation in terms of readability I think was with the SQLAlchemy Core:

def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
            [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )

The context of this function is given in the documentation article.