问题:使用SQLAlchemy ORM批量插入
有什么方法可以让SQLAlchemy进行批量插入,而不是插入每个对象。即
在做:
INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)
而不是:
INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)
我刚刚将一些代码转换为使用sqlalchemy而不是原始sql,尽管现在使用它起来要好得多,但现在似乎要慢一些(最多10倍),我想知道这是否是原因。
也许我可以更有效地使用会话来改善这种情况。目前,我已经添加了一些东西,autoCommit=False
并做了一个session.commit()
。尽管如果在其他地方更改了数据库,这似乎会使数据过时,例如,即使我执行新查询,我仍然可以返回旧结果?
谢谢你的帮助!
回答 0
SQLAlchemy在版本中引入了该功能1.0.0
:
通过这些操作,您现在可以批量插入或更新!
例如,您可以执行以下操作:
s = Session()
objects = [
User(name="u1"),
User(name="u2"),
User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()
在这里,将制成大量插入物。
回答 1
sqlalchemy文档对可用于批量插入的各种技术的性能进行了总结:
ORM基本上不是用于高性能批量插入的-这是SQLAlchemy除了将ORM作为一流组件之外还提供Core的全部原因。
对于快速批量插入的用例,ORM所基于的SQL生成和执行系统是Core的一部分。直接使用该系统,我们可以产生与直接使用原始数据库API相比具有竞争力的INSERT。
另外,SQLAlchemy ORM提供了Bulk Operations方法套件,该套件提供了到工作单元过程各部分的挂钩,以便发出基于ORM的自动化程度较低的Core级INSERT和UPDATE构造。
下面的示例说明了基于时间的测试,该测试针对从自动程度最高到最少的几种不同的行插入方法。使用cPython 2.7,可以观察到运行时:
classics-MacBook-Pro:sqlalchemy classic$ python test.py SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs sqlite3: Total time for 100000 records 0.487842082977 sec
脚本:
import time import sqlite3 from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, create_engine from sqlalchemy.orm import scoped_session, sessionmaker Base = declarative_base() DBSession = scoped_session(sessionmaker()) engine = None class Customer(Base): __tablename__ = "customer" id = Column(Integer, primary_key=True) name = Column(String(255)) def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'): global engine engine = create_engine(dbname, echo=False) DBSession.remove() DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False) Base.metadata.drop_all(engine) Base.metadata.create_all(engine) def test_sqlalchemy_orm(n=100000): init_sqlalchemy() t0 = time.time() for i in xrange(n): customer = Customer() customer.name = 'NAME ' + str(i) DBSession.add(customer) if i % 1000 == 0: DBSession.flush() DBSession.commit() print( "SQLAlchemy ORM: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def test_sqlalchemy_orm_pk_given(n=100000): init_sqlalchemy() t0 = time.time() for i in xrange(n): customer = Customer(id=i+1, name="NAME " + str(i)) DBSession.add(customer) if i % 1000 == 0: DBSession.flush() DBSession.commit() print( "SQLAlchemy ORM pk given: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def test_sqlalchemy_orm_bulk_insert(n=100000): init_sqlalchemy() t0 = time.time() n1 = n while n1 > 0: n1 = n1 - 10000 DBSession.bulk_insert_mappings( Customer, [ dict(name="NAME " + str(i)) for i in xrange(min(10000, n1)) ] ) DBSession.commit() print( "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def test_sqlalchemy_core(n=100000): init_sqlalchemy() t0 = time.time() engine.execute( Customer.__table__.insert(), [{"name": 'NAME ' + str(i)} for i in xrange(n)] ) print( "SQLAlchemy Core: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def init_sqlite3(dbname): conn = sqlite3.connect(dbname) c = conn.cursor() c.execute("DROP TABLE IF EXISTS customer") c.execute( "CREATE TABLE customer (id INTEGER NOT NULL, " "name VARCHAR(255), PRIMARY KEY(id))") conn.commit() return conn def test_sqlite3(n=100000, dbname='sqlite3.db'): conn = init_sqlite3(dbname) c = conn.cursor() t0 = time.time() for i in xrange(n): row = ('NAME ' + str(i),) c.execute("INSERT INTO customer (name) VALUES (?)", row) conn.commit() print( "sqlite3: Total time for " + str(n) + " records " + str(time.time() - t0) + " sec") if __name__ == '__main__': test_sqlalchemy_orm(100000) test_sqlalchemy_orm_pk_given(100000) test_sqlalchemy_orm_bulk_insert(100000) test_sqlalchemy_core(100000) test_sqlite3(100000)
回答 2
据我所知,没有办法让ORM发出批量插入。我认为根本原因是SQLAlchemy需要跟踪每个对象的身份(即新的主键),而大容量插入会对此产生干扰。例如,假设您的foo
表包含一id
列并映射到一个Foo
类:
x = Foo(bar=1)
print x.id
# None
session.add(x)
session.flush()
# BEGIN
# INSERT INTO foo (bar) VALUES(1)
# COMMIT
print x.id
# 1
由于SQLAlchemy在x.id
不发出另一个查询的情况下获取了该值,因此我们可以推断出它直接从该INSERT
语句中获取了该值。如果不需要随后通过相同实例访问创建的对象,则可以跳过ORM层进行插入:
Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))
SQLAlchemy无法将这些新行与任何现有对象匹配,因此您必须重新查询它们以进行任何后续操作。
至于过时的数据,记住该会话没有内置的方式来了解何时在会话外更改数据库是很有帮助的。为了通过现有实例访问外部修改的数据,必须将这些实例标记为expired。默认情况下会发生这种情况session.commit()
,但可以通过调用session.expire_all()
或手动完成session.expire(instance)
。一个例子(省略SQL):
x = Foo(bar=1)
session.add(x)
session.commit()
print x.bar
# 1
foo.update().execute(bar=42)
print x.bar
# 1
session.expire(x)
print x.bar
# 42
session.commit()
expires x
,因此第一个打印语句隐式打开一个新事务并重新查询x
属性。如果注释掉第一个打印语句,您会注意到第二个打印语句现在会选择正确的值,因为直到更新后才会发出新查询。
从事务隔离的角度来看,这是有道理的-您只应在事务之间进行外部修改。如果这给您带来麻烦,建议您弄清或重新考虑应用程序的事务边界,而不要立即进行操作session.expire_all()
。
回答 3
我通常使用add_all
。
from app import session
from models import User
objects = [User(name="u1"), User(name="u2"), User(name="u3")]
session.add_all(objects)
session.commit()
回答 4
从0.8版开始,直接支持已添加到SQLAlchemy
根据docs,connection.execute(table.insert().values(data))
应该可以解决问题。(请注意,这是不一样的connection.execute(table.insert(), data)
通过将呼叫这导致许多个别行插入executemany
)。除了本地连接之外,其他任何方面的性能差异都可能很大。
回答 5
SQLAlchemy在版本中引入了该功能1.0.0
:
通过这些操作,您现在可以批量插入或更新!
例如(如果您希望简单表INSERT的开销最小),可以使用Session.bulk_insert_mappings()
:
loadme = [(1, 'a'),
(2, 'b'),
(3, 'c')]
dicts = [dict(bar=t[0], fly=t[1]) for t in loadme]
s = Session()
s.bulk_insert_mappings(Foo, dicts)
s.commit()
或者,如果需要,可以跳过loadme
元组,直接将字典写进去dicts
(但是我发现将所有的单词遗漏在数据之外并循环加载字典列表会更容易)。
回答 6
Piere的回答是正确的,但是一个问题是bulk_save_objects
,如果您担心的话,默认情况下不会返回对象的主键。设置return_defaults
为True
可得到此行为。
文档在这里。
foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')]
session.bulk_save_objects(foos, return_defaults=True)
for foo in foos:
assert foo.id is not None
session.commit()
回答 7
条条大路通罗马,但其中一些横穿山脉,需要渡轮,但是如果您想快速到达那儿,只需上高速公路。
在这种情况下,高速公路将使用psycopg2的execute_batch()功能。该文档说的最好:
当前的实现executemany()
(使用非常慈善的轻描淡写)不是特别有效。这些功能可用于加快针对一组参数的语句的重复执行。通过减少服务器往返次数,性能可以比使用服务器好几个数量级。executemany()
。
在我自己的测试execute_batch()
是快2倍左右为executemany()
,并给出配置进行进一步的调整所以page_size的选项(如果你想挤进业绩的最后2-3%的驾驶者)。
如果使用SQLAlchemy,则可以通过use_batch_mode=True
在实例化引擎时将其设置为参数来轻松启用相同功能。create_engine()
回答 8
这是一种方法:
values = [1, 2, 3]
Foo.__table__.insert().execute([{'bar': x} for x in values])
这样插入:
INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)
参考:SQLAlchemy FAQ包含各种提交方法的基准。
回答 9
到目前为止,我发现的最佳答案是在sqlalchemy文档中:
有一个完整示例说明了可能的解决方案基准。
如文档所示:
bulk_save_objects不是最佳解决方案,但其性能是正确的。
就可读性而言,第二好的实现是我认为使用SQLAlchemy Core:
def test_sqlalchemy_core(n=100000):
init_sqlalchemy()
t0 = time.time()
engine.execute(
Customer.__table__.insert(),
[{"name": 'NAME ' + str(i)} for i in xrange(n)]
)
文档文章中提供了此功能的上下文。