标签归档:sqlalchemy

使用SQLAlchemy ORM高效地更新数据库

问题:使用SQLAlchemy ORM高效地更新数据库

我正在启动一个新应用程序,并考虑使用ORM,尤其是SQLAlchemy。

假设我的数据库中有一列“ foo”,我想增加它。在直通sqlite中,这很容易:

db = sqlite3.connect('mydata.sqlitedb')
cur = db.cursor()
cur.execute('update table stuff set foo = foo + 1')

我弄清楚了SQLAlchemy SQL-builder等效项:

engine = sqlalchemy.create_engine('sqlite:///mydata.sqlitedb')
md = sqlalchemy.MetaData(engine)
table = sqlalchemy.Table('stuff', md, autoload=True)
upd = table.update(values={table.c.foo:table.c.foo+1})
engine.execute(upd)

这稍微慢一点,但是没有太多。

这是我对SQLAlchemy ORM方法的最佳猜测:

# snip definition of Stuff class made using declarative_base
# snip creation of session object
for c in session.query(Stuff):
    c.foo = c.foo + 1
session.flush()
session.commit()

这样做是正确的,但所需的时间是其他两种方法的近50倍。我认为这是因为它必须先将所有数据带入内存,然后才能使用它。

有什么方法可以使用SQLAlchemy的ORM生成高效的SQL?还是使用其他任何Python ORM?还是我应该回到手工编写SQL?

I’m starting a new application and looking at using an ORM — in particular, SQLAlchemy.

Say I’ve got a column ‘foo’ in my database and I want to increment it. In straight sqlite, this is easy:

db = sqlite3.connect('mydata.sqlitedb')
cur = db.cursor()
cur.execute('update table stuff set foo = foo + 1')

I figured out the SQLAlchemy SQL-builder equivalent:

engine = sqlalchemy.create_engine('sqlite:///mydata.sqlitedb')
md = sqlalchemy.MetaData(engine)
table = sqlalchemy.Table('stuff', md, autoload=True)
upd = table.update(values={table.c.foo:table.c.foo+1})
engine.execute(upd)

This is slightly slower, but there’s not much in it.

Here’s my best guess for a SQLAlchemy ORM approach:

# snip definition of Stuff class made using declarative_base
# snip creation of session object
for c in session.query(Stuff):
    c.foo = c.foo + 1
session.flush()
session.commit()

This does the right thing, but it takes just under fifty times as long as the other two approaches. I presume that’s because it has to bring all the data into memory before it can work with it.

Is there any way to generate the efficient SQL using SQLAlchemy’s ORM? Or using any other python ORM? Or should I just go back to writing the SQL by hand?


回答 0

SQLAlchemy的ORM旨在与SQL层一起使用,而不是将其隐藏。但是,在同一事务中使用ORM和纯SQL时,您必须牢记一两件事。基本上,从一方面讲,仅当您从会话中清除更改时,ORM数据修改才会命中数据库。另一方面,SQL数据操作语句不会影响会话中的对象。

所以如果你说

for c in session.query(Stuff).all():
    c.foo = c.foo+1
session.commit()

它会按照说的去做,从数据库中获取所有对象,修改所有对象,然后在需要时将更改刷新到数据库中,一行一行地更新。

相反,您应该这样做:

session.execute(update(stuff_table, values={stuff_table.c.foo: stuff_table.c.foo + 1}))
session.commit()

这将像您期望的那样作为一个查询执行,并且因为至少默认会话配置在提交时使会话中的所有数据失效,所以您没有任何过时的数据问题。

在即将发布的0.5系列中,您还可以使用以下方法进行更新:

session.query(Stuff).update({Stuff.foo: Stuff.foo + 1})
session.commit()

基本上,它将运行与上一片段相同的SQL语句,但还会选择更改的行并使会话中的所有过时数据过期。如果您知道更新后没有使用任何会话数据,则也可以synchronize_session=False将其添加到update语句中并摆脱该选择。

SQLAlchemy’s ORM is meant to be used together with the SQL layer, not hide it. But you do have to keep one or two things in mind when using the ORM and plain SQL in the same transaction. Basically, from one side, ORM data modifications will only hit the database when you flush the changes from your session. From the other side, SQL data manipulation statements don’t affect the objects that are in your session.

So if you say

for c in session.query(Stuff).all():
    c.foo = c.foo+1
session.commit()

it will do what it says, go fetch all the objects from the database, modify all the objects and then when it’s time to flush the changes to the database, update the rows one by one.

Instead you should do this:

session.execute(update(stuff_table, values={stuff_table.c.foo: stuff_table.c.foo + 1}))
session.commit()

This will execute as one query as you would expect, and because at least the default session configuration expires all data in the session on commit you don’t have any stale data issues.

In the almost-released 0.5 series you could also use this method for updating:

session.query(Stuff).update({Stuff.foo: Stuff.foo + 1})
session.commit()

That will basically run the same SQL statement as the previous snippet, but also select the changed rows and expire any stale data in the session. If you know you aren’t using any session data after the update you could also add synchronize_session=False to the update statement and get rid of that select.


回答 1

session.query(Clients).filter(Clients.id == client_id_list).update({'status': status})
session.commit()

试试这个=)

session.query(Clients).filter(Clients.id == client_id_list).update({'status': status})
session.commit()

Try this =)


回答 2

有几种使用sqlalchemy进行更新的方法

1) for c in session.query(Stuff).all():
       c.foo += 1
   session.commit()

2) session.query().\
       update({"foo": (Stuff.foo + 1)})
   session.commit()

3) conn = engine.connect()
   stmt = Stuff.update().\
       values(Stuff.foo = (Stuff.foo + 1))
   conn.execute(stmt)

There are several ways to UPDATE using sqlalchemy

1) for c in session.query(Stuff).all():
       c.foo += 1
   session.commit()

2) session.query().\
       update({"foo": (Stuff.foo + 1)})
   session.commit()

3) conn = engine.connect()
   stmt = Stuff.update().\
       values(Stuff.foo = (Stuff.foo + 1))
   conn.execute(stmt)

回答 3

这是一个无需手动映射字段即可解决相同问题的示例:

from sqlalchemy import Column, ForeignKey, Integer, String, Date, DateTime, text, create_engine
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.attributes import InstrumentedAttribute

engine = create_engine('postgres://postgres@localhost:5432/database')
session = sessionmaker()
session.configure(bind=engine)

Base = declarative_base()


class Media(Base):
  __tablename__ = 'media'
  id = Column(Integer, primary_key=True)
  title = Column(String, nullable=False)
  slug = Column(String, nullable=False)
  type = Column(String, nullable=False)

  def update(self):
    s = session()
    mapped_values = {}
    for item in Media.__dict__.iteritems():
      field_name = item[0]
      field_type = item[1]
      is_column = isinstance(field_type, InstrumentedAttribute)
      if is_column:
        mapped_values[field_name] = getattr(self, field_name)

    s.query(Media).filter(Media.id == self.id).update(mapped_values)
    s.commit()

因此,要更新Media实例,您可以执行以下操作:

media = Media(id=123, title="Titular Line", slug="titular-line", type="movie")
media.update()

Here’s an example of how to solve the same problem without having to map the fields manually:

from sqlalchemy import Column, ForeignKey, Integer, String, Date, DateTime, text, create_engine
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.attributes import InstrumentedAttribute

engine = create_engine('postgres://postgres@localhost:5432/database')
session = sessionmaker()
session.configure(bind=engine)

Base = declarative_base()


class Media(Base):
  __tablename__ = 'media'
  id = Column(Integer, primary_key=True)
  title = Column(String, nullable=False)
  slug = Column(String, nullable=False)
  type = Column(String, nullable=False)

  def update(self):
    s = session()
    mapped_values = {}
    for item in Media.__dict__.iteritems():
      field_name = item[0]
      field_type = item[1]
      is_column = isinstance(field_type, InstrumentedAttribute)
      if is_column:
        mapped_values[field_name] = getattr(self, field_name)

    s.query(Media).filter(Media.id == self.id).update(mapped_values)
    s.commit()

So to update a Media instance, you can do something like this:

media = Media(id=123, title="Titular Line", slug="titular-line", type="movie")
media.update()

回答 4

经过足够的测试,我会尝试:

for c in session.query(Stuff).all():
     c.foo = c.foo+1
session.commit()

(IIRC,commit()在不使用flush()的情况下工作)。

我发现有时执行大型查询然后在python中进行迭代比许多查询快2个数量级。我假设遍历查询对象的效率不及遍历查询对象的all()方法生成的列表的效率。

[请注意下面的评论-这根本没有加快速度]。

Withough testing, I’d try:

for c in session.query(Stuff).all():
     c.foo = c.foo+1
session.commit()

(IIRC, commit() works without flush()).

I’ve found that at times doing a large query and then iterating in python can be up to 2 orders of magnitude faster than lots of queries. I assume that iterating over the query object is less efficient than iterating over a list generated by the all() method of the query object.

[Please note comment below – this did not speed things up at all].


回答 5

如果是由于创建对象方面的开销,那么使用SA可能根本无法加速。

如果是因为它正在加载相关对象,那么您可以通过延迟加载来执行某些操作。是否存在大量由于引用而创建的对象?(即,获取Company对象也将获取所有相关的People对象)。

If it is because of the overhead in terms of creating objects, then it probably can’t be sped up at all with SA.

If it is because it is loading up related objects, then you might be able to do something with lazy loading. Are there lots of objects being created due to references? (IE, getting a Company object also gets all of the related People objects).


SQLAlchemy:级联删除

问题:SQLAlchemy:级联删除

我必须缺少SQLAlchemy的级联选项的琐碎内容,因为我无法获得简单的级联删除来正确操作-如果删除了父元素,则子级将保留并带有null外键。

我在这里放了一个简洁的测试用例:

from sqlalchemy import Column, Integer, ForeignKey
from sqlalchemy.orm import relationship

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class Parent(Base):
    __tablename__ = "parent"
    id = Column(Integer, primary_key = True)

class Child(Base):
    __tablename__ = "child"
    id = Column(Integer, primary_key = True)
    parentid = Column(Integer, ForeignKey(Parent.id))
    parent = relationship(Parent, cascade = "all,delete", backref = "children")

engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

session = Session()

parent = Parent()
parent.children.append(Child())
parent.children.append(Child())
parent.children.append(Child())

session.add(parent)
session.commit()

print "Before delete, children = {0}".format(session.query(Child).count())
print "Before delete, parent = {0}".format(session.query(Parent).count())

session.delete(parent)
session.commit()

print "After delete, children = {0}".format(session.query(Child).count())
print "After delete parent = {0}".format(session.query(Parent).count())

session.close()

输出:

Before delete, children = 3
Before delete, parent = 1
After delete, children = 3
After delete parent = 0

父母与子女之间存在简单的一对多关系。该脚本创建一个父级,添加3个子级,然后提交。接下来,它删除父级,但子级仍然存在。为什么?如何使孩子级联删除?

I must be missing something trivial with SQLAlchemy’s cascade options because I cannot get a simple cascade delete to operate correctly — if a parent element is a deleted, the children persist, with null foreign keys.

I’ve put a concise test case here:

from sqlalchemy import Column, Integer, ForeignKey
from sqlalchemy.orm import relationship

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class Parent(Base):
    __tablename__ = "parent"
    id = Column(Integer, primary_key = True)

class Child(Base):
    __tablename__ = "child"
    id = Column(Integer, primary_key = True)
    parentid = Column(Integer, ForeignKey(Parent.id))
    parent = relationship(Parent, cascade = "all,delete", backref = "children")

engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

session = Session()

parent = Parent()
parent.children.append(Child())
parent.children.append(Child())
parent.children.append(Child())

session.add(parent)
session.commit()

print "Before delete, children = {0}".format(session.query(Child).count())
print "Before delete, parent = {0}".format(session.query(Parent).count())

session.delete(parent)
session.commit()

print "After delete, children = {0}".format(session.query(Child).count())
print "After delete parent = {0}".format(session.query(Parent).count())

session.close()

Output:

Before delete, children = 3
Before delete, parent = 1
After delete, children = 3
After delete parent = 0

There is a simple, one-to-many relationship between Parent and Child. The script creates a parent, adds 3 children, then commits. Next, it deletes the parent, but the children persist. Why? How do I make the children cascade delete?


回答 0

问题是sqlalchemy认为Child是父级的,因为这是您定义关系的地方(当然,它并不关心您将其称为“子级”)。

如果您在Parent类上定义关系,它将起作用:

children = relationship("Child", cascade="all,delete", backref="parent")

(请注意"Child"为字符串:使用声明式样式时允许这样做,以便您可以引用尚未定义的类)

您可能还想添加delete-orphandelete导致删除父级时删除子级,delete-orphan也删除从父级“删除”的所有子级,即使未删除父级也是如此)

编辑:刚刚发现:如果您确实想在Child类上定义关系,则可以这样做,但是您将必须在backref上定义级联(通过显式创建backref),如下所示:

parent = relationship(Parent, backref=backref("children", cascade="all,delete"))

(暗示from sqlalchemy.orm import backref

The problem is that sqlalchemy considers Child as the parent, because that is where you defined your relationship (it doesn’t care that you called it “Child” of course).

If you define the relationship on the Parent class instead, it will work:

children = relationship("Child", cascade="all,delete", backref="parent")

(note "Child" as a string: this is allowed when using the declarative style, so that you are able to refer to a class that is not yet defined)

You might want to add delete-orphan as well (delete causes children to be deleted when the parent gets deleted, delete-orphan also deletes any children that were “removed” from the parent, even if the parent is not deleted)

EDIT: just found out: if you really want to define the relationship on the Child class, you can do so, but you will have to define the cascade on the backref (by creating the backref explicitly), like this:

parent = relationship(Parent, backref=backref("children", cascade="all,delete"))

(implying from sqlalchemy.orm import backref)


回答 1

当您删除@Steven的附件时,session.delete()这是一件好事,对于我而言,这永远不会发生。我注意到大部分时间都是通过删除session.query().filter().delete()(它不会将元素放入内存中并直接从db中删除)。使用此方法sqlalchemy cascade='all, delete'无效。但是,有一个解决方案:ON DELETE CASCADE通过db(注意:并非所有数据库都支持它)。

class Child(Base):
    __tablename__ = "children"

    id = Column(Integer, primary_key=True)
    parent_id = Column(Integer, ForeignKey("parents.id", ondelete='CASCADE'))

class Parent(Base):
    __tablename__ = "parents"

    id = Column(Integer, primary_key=True)
    child = relationship(Child, backref="parent", passive_deletes=True)

@Steven’s asnwer is good when you are deleting through session.delete() which never happens in my case. I noticed that most of the time I delete through session.query().filter().delete() (which doesn’t put elements in the memory and deletes directly from db). Using this method sqlalchemy’s cascade='all, delete' doesn’t work. There is a solution though: ON DELETE CASCADE through db (note: not all databases support it).

class Child(Base):
    __tablename__ = "children"

    id = Column(Integer, primary_key=True)
    parent_id = Column(Integer, ForeignKey("parents.id", ondelete='CASCADE'))

class Parent(Base):
    __tablename__ = "parents"

    id = Column(Integer, primary_key=True)
    child = relationship(Child, backref="parent", passive_deletes=True)

回答 2

很老的帖子,但是我只是花了一两个小时,所以我想分享我的发现,特别是因为列出的其他一些评论不太正确。

TL; DR

给子表一个外部表或修改现有表,并添加ondelete='CASCADE'

parent_id = db.Column(db.Integer, db.ForeignKey('parent.id', ondelete='CASCADE'))

一个下列关系:

a)在父表上:

children = db.relationship('Child', backref='parent', passive_deletes=True)

b)在子表上:

parent = db.relationship('Parent', backref=backref('children', passive_deletes=True))

细节

首先,尽管接受了答案,但父母/子女关系不是通过使用建立的relationship,而是通过使用建立的ForeignKey。您可以将它relationship放在父表或子表上,它将正常工作。尽管显然在子表上,backref除了关键字参数之外,您还必须使用该函数。

选项1(首选)

其次,SqlAlchemy支持两种不同的级联。我建议的第一个和第一个建议是内置于数据库中的,通常采取对外键声明的约束形式。在PostgreSQL中,它看起来像这样:

CONSTRAINT child_parent_id_fkey FOREIGN KEY (parent_id)
REFERENCES parent_table(id) MATCH SIMPLE
ON DELETE CASCADE

这意味着当您从中删除记录时parent_table,数据库中的所有相应行都child_table将为您删除。它快速可靠,可能是您最好的选择。您可以通过以下方式在SqlAlchemy中进行设置ForeignKey(子表定义的一部分):

parent_id = db.Column(db.Integer, db.ForeignKey('parent.id', ondelete='CASCADE'))
parent = db.relationship('Parent', backref=backref('children', passive_deletes=True))

ondelete='CASCADE'是创建零件ON DELETE CASCADE放在桌子上。

知道了!

这里有一个重要的警告。请注意我如何relationship指定passive_deletes=True?如果没有的话,整个事情将无法进行。这是因为默认情况下,当您删除父记录时,SqlAlchemy所做的事情确实很奇怪。它将所有子行的外键设置为NULL。因此,如果您从parent_tablewhere id= 5 删除一行,那么它将基本上执行

UPDATE child_table SET parent_id = NULL WHERE parent_id = 5

为什么你要这个我不知道。如果许多数据库引擎甚至允许您将有效外键设置为NULL,那么我会感到很惊讶,从而创建了一个孤儿。似乎是个坏主意,但也许有一个用例。无论如何,如果让SqlAlchemy执行此操作,则将防止数据库能够使用ON DELETE CASCADE您设置的清理子级。这是因为它依靠那些外键来知道要删除哪些子行。一旦SqlAlchemy将它们全部设置为NULL,数据库将无法删除它们。设置passive_deletes=Trueprevent可以防止SqlAlchemy NULL读出外键。

您可以在SqlAlchemy文档中阅读有关被动删除的更多信息。

选项2

您可以执行的另一种方法是让SqlAlchemy为您完成。这是使用的cascade参数设置的relationship。如果您在父表上定义了关系,则它看起来像这样:

children = relationship('Child', cascade='all,delete', backref='parent')

如果该关系与孩子有关,则可以这样进行:

parent = relationship('Parent', backref=backref('children', cascade='all,delete'))

同样,这是孩子,因此您必须调用一个称为的方法backref并将级联数据放入其中。

这样,当您删除父行时,SqlAlchemy实际上将运行delete语句供您清理子行。如果您愿意,这可能不如让该数据库处理有效,所以我不建议这样做。

这是有关其支持的级联功能的SqlAlchemy文档

Pretty old post, but I just spent an hour or two on this, so I wanted to share my finding, especially since some of the other comments listed aren’t quite right.

TL;DR

Give the child table a foreign or modify the existing one, adding ondelete='CASCADE':

parent_id = db.Column(db.Integer, db.ForeignKey('parent.id', ondelete='CASCADE'))

And one of the following relationships:

a) This on the parent table:

children = db.relationship('Child', backref='parent', passive_deletes=True)

b) Or this on the child table:

parent = db.relationship('Parent', backref=backref('children', passive_deletes=True))

Details

First off, despite what the accepted answer says, the parent/child relationship is not established by using relationship, it’s established by using ForeignKey. You can put the relationship on either the parent or child tables and it will work fine. Although, apparently on the child tables, you have to use the backref function in addition to the keyword argument.

Option 1 (preferred)

Second, SqlAlchemy supports two different kinds of cascading. The first, and the one I recommend, is built into your database and usually takes the form of a constraint on the foreign key declaration. In PostgreSQL it looks like this:

CONSTRAINT child_parent_id_fkey FOREIGN KEY (parent_id)
REFERENCES parent_table(id) MATCH SIMPLE
ON DELETE CASCADE

This means that when you delete a record from parent_table, then all the corresponding rows in child_table will be deleted for you by the database. It’s fast and reliable and probably your best bet. You set this up in SqlAlchemy through ForeignKey like this (part of the child table definition):

parent_id = db.Column(db.Integer, db.ForeignKey('parent.id', ondelete='CASCADE'))
parent = db.relationship('Parent', backref=backref('children', passive_deletes=True))

The ondelete='CASCADE' is the part that creates the ON DELETE CASCADE on the table.

Gotcha!

There’s an important caveat here. Notice how I have a relationship specified with passive_deletes=True? If you don’t have that, the entire thing will not work. This is because by default when you delete a parent record SqlAlchemy does something really weird. It sets the foreign keys of all child rows to NULL. So if you delete a row from parent_table where id = 5, then it will basically execute

UPDATE child_table SET parent_id = NULL WHERE parent_id = 5

Why you would want this I have no idea. I’d be surprised if many database engines even allowed you to set a valid foreign key to NULL, creating an orphan. Seems like a bad idea, but maybe there’s a use case. Anyway, if you let SqlAlchemy do this, you will prevent the database from being able to clean up the children using the ON DELETE CASCADE that you set up. This is because it relies on those foreign keys to know which child rows to delete. Once SqlAlchemy has set them all to NULL, the database can’t delete them. Setting the passive_deletes=True prevents SqlAlchemy from NULLing out the foreign keys.

You can read more about passive deletes in the SqlAlchemy docs.

Option 2

The other way you can do it is to let SqlAlchemy do it for you. This is set up using the cascade argument of the relationship. If you have the relationship defined on the parent table, it looks like this:

children = relationship('Child', cascade='all,delete', backref='parent')

If the relationship is on the child, you do it like this:

parent = relationship('Parent', backref=backref('children', cascade='all,delete'))

Again, this is the child so you have to call a method called backref and putting the cascade data in there.

With this in place, when you delete a parent row, SqlAlchemy will actually run delete statements for you to clean up the child rows. This will likely not be as efficient as letting this database handle if for you so I don’t recommend it.

Here are the SqlAlchemy docs on the cascading features it supports.


回答 3

Steven是正确的,因为您需要显式创建backref,这将导致级联被应用到父级(而不是像在测试场景中那样被应用于子级)。

但是,在Child上定义关系不会使sqlalchemy将Child视为父级。定义关系的位置(子级或父级)都无关紧要,它的外键链接两个确定父级和子级的表。

不过,遵循一个惯例是有意义的,并且根据史蒂文的回应,我正在定义我所有与父母的孩子关系。

Steven is correct in that you need to explicitly create the backref, this results in the cascade being applied on the parent (as opposed to it being applied to the child like in the test scenario).

However, defining the relationship on the Child does NOT make sqlalchemy consider Child the parent. It doesn’t matter where the relationship is defined (child or parent), its the foreign key that links the two tables that determines which is the parent and which is the child.

It makes sense to stick to one convention though, and based on Steven’s response, I’m defining all my child relationships on the parent.


回答 4

我也为文档苦苦挣扎,但是发现文档字符串本身比手册更容易。例如,如果您从sqlalchemy.orm导入关系并执行help(relationship),它将为您提供可以为级联指定的所有选项。项目符号为delete-orphan

如果检测到没有父母的孩子类型的项目,则将其标记为删除。
请注意,此选项可防止在没有父母出席的情况下持久保留孩子Class中待处理的项目。

我知道您的问题更多地在于定义父子关系的文档的方式。但是似乎您也可能对层叠选项有疑问,因为"all"include "delete""delete-orphan"是唯一未包含的选项"all"

I struggled with the documentation as well, but found that the docstrings themselves tend to be easier than the manual. For example, if you import relationship from sqlalchemy.orm and do help(relationship), it will give you all the options you can specify for cascade. The bullet for delete-orphan says:

if an item of the child’s type with no parent is detected, mark it for deletion.
Note that this option prevents a pending item of the child’s class from being persisted without a parent present.

I realize your issue was more with the way the documentation for defining parent-child relationships. But it seemed that you might also be having a problem with the cascade options, because "all" includes "delete". "delete-orphan" is the only option that’s not included in "all".


回答 5

史蒂文的答案很坚定。我想指出另外一个含义。

通过使用relationship,您将使应用层(Flask)负责引用完整性。这意味着其他不通过Flask访问数据库的进程(例如数据库实用程序或直接连接到数据库的人)将不会遇到这些约束,并且可能以破坏您如此努力设计的逻辑数据模型的方式更改数据。

尽可能使用ForeignKeyd512和Alex描述的方法。DB引擎非常擅长真正地执行约束(以不可避免的方式),因此,这是保持数据完整性的最佳策略。您唯一需要依赖应用程序来处理数据完整性的时间是数据库无法处理数据完整性时,例如不支持外键的SQLite版本。

如果您需要在实体之间创建进一步的链接以启用诸如导航父子对象关系之类的应用行为backref,请与结合使用ForeignKey

Steven’s answer is solid. I’d like to point out an additional implication.

By using relationship, you’re making the app layer (Flask) responsible for referential integrity. That means other processes that access the database not through Flask, like a database utility or a person connecting to the database directly, will not experience those constraints and could change your data in a way that breaks the logical data model you worked so hard to design.

Whenever possible, use the ForeignKey approach described by d512 and Alex. The DB engine is very good at truly enforcing constraints (in an unavoidable way), so this is by far the best strategy for maintaining data integrity. The only time you need to rely on an app to handle data integrity is when the database can’t handle them, e.g. versions of SQLite that don’t support foreign keys.

If you need to create further linkage among entities to enable app behaviors like navigating parent-child object relationships, use backref in conjunction with ForeignKey.


回答 6

Stevan的回答是完美的。但是,如果仍然出现错误。在此之上的其他可能的尝试是-

http://vincentaudebert.github.io/python/sql/2015/10/09/cascade-delete-sqlalchemy/

从链接复制-

快速提示:即使您在模型中指定了级联删除,如果您遇到外键依赖关系时遇到麻烦。

使用SQLAlchemy指定cascade='all, delete'父级表上应具有的级联删除。好的,但是当您执行类似的操作时:

session.query(models.yourmodule.YourParentTable).filter(conditions).delete()

实际上,它会触发有关您的子表中使用的外键的错误。

我用它来查询对象然后删除它的解决方案:

session = models.DBSession()
your_db_object = session.query(models.yourmodule.YourParentTable).filter(conditions).first()
if your_db_object is not None:
    session.delete(your_db_object)

这将删除您的父记录以及与其关联的所有子记录。

Answer by Stevan is perfect. But if you are still getting the error. Other possible try on top of that would be –

http://vincentaudebert.github.io/python/sql/2015/10/09/cascade-delete-sqlalchemy/

Copied from the link-

Quick tip if you get in trouble with a foreign key dependency even if you have specified a cascade delete in your models.

Using SQLAlchemy, to specify a cascade delete you should have cascade='all, delete' on your parent table. Ok but then when you execute something like:

session.query(models.yourmodule.YourParentTable).filter(conditions).delete()

It actually triggers an error about a foreign key used in your children tables.

The solution I used it to query the object and then delete it:

session = models.DBSession()
your_db_object = session.query(models.yourmodule.YourParentTable).filter(conditions).first()
if your_db_object is not None:
    session.delete(your_db_object)

This should delete your parent record AND all the children associated with it.


回答 7

Alex Okrushko的回答对我来说几乎是最好的。结合使用ondelete =’CASCADE’和passive_deletes = True。但是我必须做些额外的事情才能使其在sqlite中起作用。

Base = declarative_base()
ROOM_TABLE = "roomdata"
FURNITURE_TABLE = "furnituredata"

class DBFurniture(Base):
    __tablename__ = FURNITURE_TABLE
    id = Column(Integer, primary_key=True)
    room_id = Column(Integer, ForeignKey('roomdata.id', ondelete='CASCADE'))


class DBRoom(Base):
    __tablename__ = ROOM_TABLE
    id = Column(Integer, primary_key=True)
    furniture = relationship("DBFurniture", backref="room", passive_deletes=True)

确保添加此代码以确保其适用于sqlite。

from sqlalchemy import event
from sqlalchemy.engine import Engine
from sqlite3 import Connection as SQLite3Connection

@event.listens_for(Engine, "connect")
def _set_sqlite_pragma(dbapi_connection, connection_record):
    if isinstance(dbapi_connection, SQLite3Connection):
        cursor = dbapi_connection.cursor()
        cursor.execute("PRAGMA foreign_keys=ON;")
        cursor.close()

从这里偷来的:SQLAlchemy表达式语言和SQLite的删除级联

Alex Okrushko answer almost worked best for me. Used ondelete=’CASCADE’ and passive_deletes=True combined. But I had to do something extra to make it work for sqlite.

Base = declarative_base()
ROOM_TABLE = "roomdata"
FURNITURE_TABLE = "furnituredata"

class DBFurniture(Base):
    __tablename__ = FURNITURE_TABLE
    id = Column(Integer, primary_key=True)
    room_id = Column(Integer, ForeignKey('roomdata.id', ondelete='CASCADE'))


class DBRoom(Base):
    __tablename__ = ROOM_TABLE
    id = Column(Integer, primary_key=True)
    furniture = relationship("DBFurniture", backref="room", passive_deletes=True)

Make sure to add this code to ensure it works for sqlite.

from sqlalchemy import event
from sqlalchemy.engine import Engine
from sqlite3 import Connection as SQLite3Connection

@event.listens_for(Engine, "connect")
def _set_sqlite_pragma(dbapi_connection, connection_record):
    if isinstance(dbapi_connection, SQLite3Connection):
        cursor = dbapi_connection.cursor()
        cursor.execute("PRAGMA foreign_keys=ON;")
        cursor.close()

Stolen from here: SQLAlchemy expression language and SQLite’s on delete cascade


回答 8

TLDR:如果上述解决方案不起作用,请尝试将nullable = False添加到您的列中。

我想在这里为一些可能无法使层叠功能与现有解决方案配合使用的人提供一个小技巧(很棒)。我的工作和示例之间的主要区别是我使用了自动映射。我不确切知道这可能如何影响级联的设置,但是我想指出我使用了它。我也在使用SQLite数据库。

我尝试了这里描述的所有解决方案,但是当删除父行时,子表中的行继续将其外键设置为null。我在这里尝试了所有解决方案都无济于事。但是,一旦我将带有外键的子列设置为nullable = False,级联就可以工作。

在子表上,我添加了:

Column('parent_id', Integer(), ForeignKey('parent.id', ondelete="CASCADE"), nullable=False)
Child.parent = relationship("parent", backref=backref("children", passive_deletes=True)

通过此设置,级联可以按预期运行。

TLDR: If the above solutions don’t work, try adding nullable=False to your column.

I’d like to add a small point here for some people who may not get the cascade function to work with the existing solutions (which are great). The main difference between my work and the example was that I used automap. I do not know exactly how that might interfere with the setup of cascades, but I want to note that I used it. I am also working with a SQLite database.

I tried every solution described here, but rows in my child table continued to have their foreign key set to null when the parent row was deleted. I’d tried all the solutions here to no avail. However, the cascade worked once I set the child column with the foreign key to nullable = False.

On the child table, I added:

Column('parent_id', Integer(), ForeignKey('parent.id', ondelete="CASCADE"), nullable=False)
Child.parent = relationship("parent", backref=backref("children", passive_deletes=True)

With this setup, the cascade functioned as expected.


sqlalchemy flush()并获取ID?

问题:sqlalchemy flush()并获取ID?

我想做这样的事情:

f = Foo(bar='x')
session.add(f)
session.flush()

# do additional queries using f.id before commit()
print f.id # should be not None

session.commit()

但是f.idNone我尝试的时候。我怎样才能使它工作?

I want to do something like this:

f = Foo(bar='x')
session.add(f)
session.flush()

# do additional queries using f.id before commit()
print f.id # should be not None

session.commit()

But f.id is None when I try it. How can I get this to work?


回答 0

您的示例代码应该已经按原样工作。SQLAlchemy应该为其提供一个值f.id,并假设其为自动生成的主键列。主键属性在flush()生成时立即在过程中填充,并且不需要调用commit()。因此,这里的答案在于以下一项或多项:

  1. 映射的详细信息
  2. 如果使用的后端有任何奇怪现象(例如,SQLite不会为复合主键生成整数值)
  3. 打开echo时发出的SQL表示什么

Your sample code should have worked as it is. SQLAlchemy should be providing a value for f.id, assuming its an autogenerating primary-key column. Primary-key attributes are populated immediately within the flush() process as they are generated, and no call to commit() should be required. So the answer here lies in one or more of the following:

  1. The details of your mapping
  2. If there are any odd quirks of the backend in use (such as, SQLite doesn’t generate integer values for a composite primary key)
  3. What the emitted SQL says when you turn on echo

回答 1

我遇到了同样的问题,经过测试,我发现这些答案中没有一个是足够的。

当前,或者从sqlalchemy .6+开始,有一个非常简单的解决方案(我不知道它是否存在于以前的版本中,尽管我认为确实存在):

session.refresh()

因此,您的代码将如下所示:

f = Foo(bar=x)
session.add(f)
session.flush()
# At this point, the object f has been pushed to the DB, 
# and has been automatically assigned a unique primary key id

f.id
# is None

session.refresh(f)
# refresh updates given object in the session with its state in the DB
# (and can also only refresh certain attributes - search for documentation)

f.id
# is the automatically assigned primary key ID given in the database.

就是这样。

I’ve just run across the same problem, and after testing I have found that NONE of these answers are sufficient.

Currently, or as of sqlalchemy .6+, there is a very simple solution (I don’t know if this exists in prior version, though I imagine it does):

session.refresh()

So, your code would look something like this:

f = Foo(bar=x)
session.add(f)
session.flush()
# At this point, the object f has been pushed to the DB, 
# and has been automatically assigned a unique primary key id

f.id
# is None

session.refresh(f)
# refresh updates given object in the session with its state in the DB
# (and can also only refresh certain attributes - search for documentation)

f.id
# is the automatically assigned primary key ID given in the database.

That’s how to do it.


回答 2

谢谢大家 我通过修改列映射解决了我的问题。对我来说,autoincrement=True是必需的。

起源:

id = Column('ID', Integer, primary_key=True, nullable=False)

修改后:

id = Column('ID', Integer, primary_key=True, autoincrement=True, nullable=True)

然后

session.flush()  
print(f.id)

没关系!

Thanks for everybody. I solved my problem by modifying the column mapping. For me, autoincrement=True is required.

origin:

id = Column('ID', Integer, primary_key=True, nullable=False)

after modified:

id = Column('ID', Integer, primary_key=True, autoincrement=True, nullable=True)

then

session.flush()  
print(f.id)

is ok!


回答 3

与dpb给出的答案不同,不需要刷新。刷新后,您可以访问id字段,sqlalchemy会自动刷新在后端自动生成的id

我遇到了这个问题,经过一番调查后找出了确切的原因,我的模型是用id作为integerfield创建的,而在我的表单中,id用hiddenfield表示(因为我不想在表单中显示id)。默认情况下,隐藏字段表示为文本。一旦我使用widget = hiddenInput()将表单更改为integerfield,问题就解决了。

unlike the answer given by dpb, a refresh is not necessary. once you flush, you can access the id field, sqlalchemy automatically refreshes the id which is auto generated at the backend

I encountered this problem and figured the exact reason after some investigation, my model was created with id as integerfield and in my form the id was represented with hiddenfield( since i did not wanted to show the id in my form). The hidden field is by default represented as a text. once I changed the form to integerfield with widget=hiddenInput()) the problem was solved.


回答 4

我曾经0在调用session.add方法之前分配给ID时遇到问题。该ID已由数据库正确分配,但在之后的会话中未检索到正确的ID session.flush()

I once had a problem with having assigned 0 to id before calling session.add method. The id was correctly assigned by the database but the correct id was not retrieved from the session after session.flush().


回答 5

您应该尝试使用session.save_or_update(f)而不是session.add(f)

You should try using session.save_or_update(f) instead of session.add(f).


SQLAlchemy版本控制关心类的导入顺序

问题:SQLAlchemy版本控制关心类的导入顺序

我在这里遵循指南:

http://www.sqlalchemy.org/docs/orm/examples.html?highlight=versioning#versioned-objects

并遇到了一个问题。我的关系定义如下:

generic_ticker = relation('MyClass', backref=backref("stuffs"))

使用字符串,因此它不在乎模型模块的导入顺序。这一切都正常工作,但是当我使用版本控制元时,出现以下错误:

sqlalchemy.exc.InvalidRequestError:初始化映射程序Mapper | MyClass | stuffs时,表达式’Trader’找不到名称(“名称’MyClass’未定义”)。如果这是一个类名,请考虑在定义了两个从属类之后,将这个Relationship()添加到类中。

我跟踪到以下错误:

  File "/home/nick/workspace/gm3/gm3/lib/history_meta.py", line 90, in __init__
    mapper = class_mapper(cls)
  File "/home/nick/venv/tg2env/lib/python2.6/site-packages/sqlalchemy/orm/util.py", line 622, in class_mapper
    mapper = mapper.compile()

class VersionedMeta(DeclarativeMeta):
    def __init__(cls, classname, bases, dict_):
        DeclarativeMeta.__init__(cls, classname, bases, dict_)

        try:
            mapper = class_mapper(cls)
            _history_mapper(mapper)
        except UnmappedClassError:
            pass

我通过尝试一下来解决了这个问题:将所有东西都放入lambda中,然后在所有导入操作完成后再运行它们。这有效,但似乎有点垃圾,如何解决此问题的任何想法是更好的方法?

谢谢!

更新资料

问题实际上与导入顺序无关。设计版本控制示例时,使映射器需要在每个版本控制类的构造函数中进行编译。当尚未定义相关的类时,编译将失败。如果是循环关系,则无法通过更改映射类的定义顺序来使其工作。

更新2

如上述更新所述(我不知道您可以在此处编辑其他人的帖子:))这很可能是由于循环引用。在这种情况下,可能有人会发现我的黑客很有用(我将其与tu​​rbogears一起使用)(替换VersionedMeta并在history_meta中全局添加create_mappers)

create_mappers = []
class VersionedMeta(DeclarativeMeta):
    def __init__(cls, classname, bases, dict_):
        DeclarativeMeta.__init__(cls, classname, bases, dict_)
        #I added this code in as it was crashing otherwise
        def make_mapper():
            try:
                mapper = class_mapper(cls)
                _history_mapper(mapper)
            except UnmappedClassError:
                pass

        create_mappers.append(lambda: make_mapper())

然后,您可以在模型__init__.py中执行以下操作

# Import your model modules here.
from myproj.lib.history_meta import create_mappers

from myproj.model.misc import *
from myproj.model.actor import *
from myproj.model.stuff1 import *
from myproj.model.instrument import *
from myproj.model.stuff import *

#setup the history
[func() for func in create_mappers]

这样,仅在定义了所有类之后,它才创建映射器。

更新3 略微无关,但在某些情况下我遇到了重复的主键错误(一次对同一对象进行2次更改)。我的解决方法是添加一个新的主自动增量键。当然,使用mysql不能超过1个,因此我不得不取消对用于创建历史表的现有内容的主键。查看我的整体代码(包括hist_id并摆脱外键约束):

"""Stolen from the offical sqlalchemy recpies
"""
from sqlalchemy.ext.declarative import DeclarativeMeta
from sqlalchemy.orm import mapper, class_mapper, attributes, object_mapper
from sqlalchemy.orm.exc import UnmappedClassError, UnmappedColumnError
from sqlalchemy import Table, Column, ForeignKeyConstraint, Integer
from sqlalchemy.orm.interfaces import SessionExtension
from sqlalchemy.orm.properties import RelationshipProperty
from sqlalchemy.types import DateTime
import datetime
from sqlalchemy.orm.session import Session

def col_references_table(col, table):
    for fk in col.foreign_keys:
        if fk.references(table):
            return True
    return False

def _history_mapper(local_mapper):
    cls = local_mapper.class_

    # set the "active_history" flag
    # on on column-mapped attributes so that the old version
    # of the info is always loaded (currently sets it on all attributes)
    for prop in local_mapper.iterate_properties:
        getattr(local_mapper.class_, prop.key).impl.active_history = True

    super_mapper = local_mapper.inherits
    super_history_mapper = getattr(cls, '__history_mapper__', None)

    polymorphic_on = None
    super_fks = []
    if not super_mapper or local_mapper.local_table is not super_mapper.local_table:
        cols = []
        for column in local_mapper.local_table.c:
            if column.name == 'version':
                continue

            col = column.copy()
            col.unique = False

            #don't auto increment stuff from the normal db
            if col.autoincrement:
                col.autoincrement = False
            #sqllite falls over with auto incrementing keys if we have a composite key
            if col.primary_key:
                col.primary_key = False

            if super_mapper and col_references_table(column, super_mapper.local_table):
                super_fks.append((col.key, list(super_history_mapper.base_mapper.local_table.primary_key)[0]))

            cols.append(col)

            if column is local_mapper.polymorphic_on:
                polymorphic_on = col

        #if super_mapper:
        #    super_fks.append(('version', super_history_mapper.base_mapper.local_table.c.version))

        cols.append(Column('hist_id', Integer, primary_key=True, autoincrement=True))
        cols.append(Column('version', Integer))
        cols.append(Column('changed', DateTime, default=datetime.datetime.now))

        if super_fks:
            cols.append(ForeignKeyConstraint(*zip(*super_fks)))

        table = Table(local_mapper.local_table.name + '_history', local_mapper.local_table.metadata,
                      *cols, mysql_engine='InnoDB')
    else:
        # single table inheritance.  take any additional columns that may have
        # been added and add them to the history table.
        for column in local_mapper.local_table.c:
            if column.key not in super_history_mapper.local_table.c:
                col = column.copy()
                super_history_mapper.local_table.append_column(col)
        table = None

    if super_history_mapper:
        bases = (super_history_mapper.class_,)
    else:
        bases = local_mapper.base_mapper.class_.__bases__
    versioned_cls = type.__new__(type, "%sHistory" % cls.__name__, bases, {})

    m = mapper(
            versioned_cls, 
            table, 
            inherits=super_history_mapper, 
            polymorphic_on=polymorphic_on,
            polymorphic_identity=local_mapper.polymorphic_identity
            )
    cls.__history_mapper__ = m

    if not super_history_mapper:
        cls.version = Column('version', Integer, default=1, nullable=False)

create_mappers = []

class VersionedMeta(DeclarativeMeta):
    def __init__(cls, classname, bases, dict_):
        DeclarativeMeta.__init__(cls, classname, bases, dict_)
        #I added this code in as it was crashing otherwise
        def make_mapper():
            try:
                mapper = class_mapper(cls)
                _history_mapper(mapper)
            except UnmappedClassError:
                pass

        create_mappers.append(lambda: make_mapper())

def versioned_objects(iter):
    for obj in iter:
        if hasattr(obj, '__history_mapper__'):
            yield obj

def create_version(obj, session, deleted = False):
    obj_mapper = object_mapper(obj)
    history_mapper = obj.__history_mapper__
    history_cls = history_mapper.class_

    obj_state = attributes.instance_state(obj)

    attr = {}

    obj_changed = False

    for om, hm in zip(obj_mapper.iterate_to_root(), history_mapper.iterate_to_root()):
        if hm.single:
            continue

        for hist_col in hm.local_table.c:
            if hist_col.key == 'version' or hist_col.key == 'changed' or hist_col.key == 'hist_id':
                continue

            obj_col = om.local_table.c[hist_col.key]

            # get the value of the
            # attribute based on the MapperProperty related to the
            # mapped column.  this will allow usage of MapperProperties
            # that have a different keyname than that of the mapped column.
            try:
                prop = obj_mapper.get_property_by_column(obj_col)
            except UnmappedColumnError:
                # in the case of single table inheritance, there may be 
                # columns on the mapped table intended for the subclass only.
                # the "unmapped" status of the subclass column on the 
                # base class is a feature of the declarative module as of sqla 0.5.2.
                continue

            # expired object attributes and also deferred cols might not be in the
            # dict.  force it to load no matter what by using getattr().
            if prop.key not in obj_state.dict:
                getattr(obj, prop.key)

            a, u, d = attributes.get_history(obj, prop.key)

            if d:
                attr[hist_col.key] = d[0]
                obj_changed = True
            elif u:
                attr[hist_col.key] = u[0]
            else:
                # if the attribute had no value.
                attr[hist_col.key] = a[0]
                obj_changed = True

    if not obj_changed:
        # not changed, but we have relationships.  OK
        # check those too
        for prop in obj_mapper.iterate_properties:
            if isinstance(prop, RelationshipProperty) and \
                attributes.get_history(obj, prop.key).has_changes():
                obj_changed = True
                break

    if not obj_changed and not deleted:
        return

    attr['version'] = obj.version
    hist = history_cls()
    for key, value in attr.iteritems():
        setattr(hist, key, value)

    obj.version += 1
    session.add(hist)

class VersionedListener(SessionExtension):
    def before_flush(self, session, flush_context, instances):
        for obj in versioned_objects(session.dirty):
            create_version(obj, session)
        for obj in versioned_objects(session.deleted):
            create_version(obj, session, deleted = True)

I was following the guide here:

http://www.sqlalchemy.org/docs/orm/examples.html?highlight=versioning#versioned-objects

and have come across an issue. I have defined my relationships like:

generic_ticker = relation('MyClass', backref=backref("stuffs"))

with strings so it doesn’t care about the import order of my model modules. This all works fine normally, but when I use the versioning meta I get the following error:

sqlalchemy.exc.InvalidRequestError: When initializing mapper Mapper|MyClass|stuffs, expression ‘Trader’ failed to locate a name (“name ‘MyClass’ is not defined”). If this is a class name, consider adding this relationship() to the class after both dependent classes have been defined.

I tracked down the error to:

  File "/home/nick/workspace/gm3/gm3/lib/history_meta.py", line 90, in __init__
    mapper = class_mapper(cls)
  File "/home/nick/venv/tg2env/lib/python2.6/site-packages/sqlalchemy/orm/util.py", line 622, in class_mapper
    mapper = mapper.compile()

class VersionedMeta(DeclarativeMeta):
    def __init__(cls, classname, bases, dict_):
        DeclarativeMeta.__init__(cls, classname, bases, dict_)

        try:
            mapper = class_mapper(cls)
            _history_mapper(mapper)
        except UnmappedClassError:
            pass

I fixed the problem by putting the try: except stuff in a lambda and running them all after all the imports have happened. This works but seems a bit rubbish, any ideas of how to fix this is a better way?

Thanks!

Update

The problem is not actually about import order. The versioning example is designed such that mapper requires compilation in costructor of each versioned class. And compilation fails when related classes are not yet defined. In case of circular relations there is no way to make it working by changing definition order of mapped classes.

Update 2

As the above update states (I didn’t know you could edit other people’s posts on here :)) this is likely due to circular references. In which case may be someone will find my hack useful (I’m using it with turbogears) (Replace VersionedMeta and add in create_mappers global in history_meta)

create_mappers = []
class VersionedMeta(DeclarativeMeta):
    def __init__(cls, classname, bases, dict_):
        DeclarativeMeta.__init__(cls, classname, bases, dict_)
        #I added this code in as it was crashing otherwise
        def make_mapper():
            try:
                mapper = class_mapper(cls)
                _history_mapper(mapper)
            except UnmappedClassError:
                pass

        create_mappers.append(lambda: make_mapper())

Then you can do something like the following in your models __init__.py

# Import your model modules here.
from myproj.lib.history_meta import create_mappers

from myproj.model.misc import *
from myproj.model.actor import *
from myproj.model.stuff1 import *
from myproj.model.instrument import *
from myproj.model.stuff import *

#setup the history
[func() for func in create_mappers]

That way it create the mappers only after all the classes have been defined.

Update 3 Slightly unrelated but I came across a duplicate primary key error in some circumstances (committing 2 changes to the same object in one go). My workaround has been to add a new primary auto-incrementing key. Of course you can’t have more than 1 with mysql so I had to de-primary key the existing stuff used to create the history table. Check out my overall code (including a hist_id and getting rid of the foreign key constraint):

"""Stolen from the offical sqlalchemy recpies
"""
from sqlalchemy.ext.declarative import DeclarativeMeta
from sqlalchemy.orm import mapper, class_mapper, attributes, object_mapper
from sqlalchemy.orm.exc import UnmappedClassError, UnmappedColumnError
from sqlalchemy import Table, Column, ForeignKeyConstraint, Integer
from sqlalchemy.orm.interfaces import SessionExtension
from sqlalchemy.orm.properties import RelationshipProperty
from sqlalchemy.types import DateTime
import datetime
from sqlalchemy.orm.session import Session

def col_references_table(col, table):
    for fk in col.foreign_keys:
        if fk.references(table):
            return True
    return False

def _history_mapper(local_mapper):
    cls = local_mapper.class_

    # set the "active_history" flag
    # on on column-mapped attributes so that the old version
    # of the info is always loaded (currently sets it on all attributes)
    for prop in local_mapper.iterate_properties:
        getattr(local_mapper.class_, prop.key).impl.active_history = True

    super_mapper = local_mapper.inherits
    super_history_mapper = getattr(cls, '__history_mapper__', None)

    polymorphic_on = None
    super_fks = []
    if not super_mapper or local_mapper.local_table is not super_mapper.local_table:
        cols = []
        for column in local_mapper.local_table.c:
            if column.name == 'version':
                continue

            col = column.copy()
            col.unique = False

            #don't auto increment stuff from the normal db
            if col.autoincrement:
                col.autoincrement = False
            #sqllite falls over with auto incrementing keys if we have a composite key
            if col.primary_key:
                col.primary_key = False

            if super_mapper and col_references_table(column, super_mapper.local_table):
                super_fks.append((col.key, list(super_history_mapper.base_mapper.local_table.primary_key)[0]))

            cols.append(col)

            if column is local_mapper.polymorphic_on:
                polymorphic_on = col

        #if super_mapper:
        #    super_fks.append(('version', super_history_mapper.base_mapper.local_table.c.version))

        cols.append(Column('hist_id', Integer, primary_key=True, autoincrement=True))
        cols.append(Column('version', Integer))
        cols.append(Column('changed', DateTime, default=datetime.datetime.now))

        if super_fks:
            cols.append(ForeignKeyConstraint(*zip(*super_fks)))

        table = Table(local_mapper.local_table.name + '_history', local_mapper.local_table.metadata,
                      *cols, mysql_engine='InnoDB')
    else:
        # single table inheritance.  take any additional columns that may have
        # been added and add them to the history table.
        for column in local_mapper.local_table.c:
            if column.key not in super_history_mapper.local_table.c:
                col = column.copy()
                super_history_mapper.local_table.append_column(col)
        table = None

    if super_history_mapper:
        bases = (super_history_mapper.class_,)
    else:
        bases = local_mapper.base_mapper.class_.__bases__
    versioned_cls = type.__new__(type, "%sHistory" % cls.__name__, bases, {})

    m = mapper(
            versioned_cls, 
            table, 
            inherits=super_history_mapper, 
            polymorphic_on=polymorphic_on,
            polymorphic_identity=local_mapper.polymorphic_identity
            )
    cls.__history_mapper__ = m

    if not super_history_mapper:
        cls.version = Column('version', Integer, default=1, nullable=False)

create_mappers = []

class VersionedMeta(DeclarativeMeta):
    def __init__(cls, classname, bases, dict_):
        DeclarativeMeta.__init__(cls, classname, bases, dict_)
        #I added this code in as it was crashing otherwise
        def make_mapper():
            try:
                mapper = class_mapper(cls)
                _history_mapper(mapper)
            except UnmappedClassError:
                pass

        create_mappers.append(lambda: make_mapper())

def versioned_objects(iter):
    for obj in iter:
        if hasattr(obj, '__history_mapper__'):
            yield obj

def create_version(obj, session, deleted = False):
    obj_mapper = object_mapper(obj)
    history_mapper = obj.__history_mapper__
    history_cls = history_mapper.class_

    obj_state = attributes.instance_state(obj)

    attr = {}

    obj_changed = False

    for om, hm in zip(obj_mapper.iterate_to_root(), history_mapper.iterate_to_root()):
        if hm.single:
            continue

        for hist_col in hm.local_table.c:
            if hist_col.key == 'version' or hist_col.key == 'changed' or hist_col.key == 'hist_id':
                continue

            obj_col = om.local_table.c[hist_col.key]

            # get the value of the
            # attribute based on the MapperProperty related to the
            # mapped column.  this will allow usage of MapperProperties
            # that have a different keyname than that of the mapped column.
            try:
                prop = obj_mapper.get_property_by_column(obj_col)
            except UnmappedColumnError:
                # in the case of single table inheritance, there may be 
                # columns on the mapped table intended for the subclass only.
                # the "unmapped" status of the subclass column on the 
                # base class is a feature of the declarative module as of sqla 0.5.2.
                continue

            # expired object attributes and also deferred cols might not be in the
            # dict.  force it to load no matter what by using getattr().
            if prop.key not in obj_state.dict:
                getattr(obj, prop.key)

            a, u, d = attributes.get_history(obj, prop.key)

            if d:
                attr[hist_col.key] = d[0]
                obj_changed = True
            elif u:
                attr[hist_col.key] = u[0]
            else:
                # if the attribute had no value.
                attr[hist_col.key] = a[0]
                obj_changed = True

    if not obj_changed:
        # not changed, but we have relationships.  OK
        # check those too
        for prop in obj_mapper.iterate_properties:
            if isinstance(prop, RelationshipProperty) and \
                attributes.get_history(obj, prop.key).has_changes():
                obj_changed = True
                break

    if not obj_changed and not deleted:
        return

    attr['version'] = obj.version
    hist = history_cls()
    for key, value in attr.iteritems():
        setattr(hist, key, value)

    obj.version += 1
    session.add(hist)

class VersionedListener(SessionExtension):
    def before_flush(self, session, flush_context, instances):
        for obj in versioned_objects(session.dirty):
            create_version(obj, session)
        for obj in versioned_objects(session.deleted):
            create_version(obj, session, deleted = True)

回答 0

我通过尝试一下来解决了这个问题:将所有东西都放入lambda中,然后在所有导入操作完成后再运行它们。

大!

I fixed the problem by putting the try: except stuff in a lambda and running them all after all the imports have happened.

Great!


Flask-SQLAlchemy如何删除单个表中的所有行

问题:Flask-SQLAlchemy如何删除单个表中的所有行

如何使用Flask-SQLAlchemy删除单个表中的所有行?

寻找这样的事情:

>>> users = models.User.query.all()
>>> models.db.session.delete(users)

# but it errs out: UnmappedInstanceError: Class '__builtin__.list' is not mapped

How do I delete all rows in a single table using Flask-SQLAlchemy?

Looking for something like this:

>>> users = models.User.query.all()
>>> models.db.session.delete(users)

# but it errs out: UnmappedInstanceError: Class '__builtin__.list' is not mapped

回答 0

尝试delete

models.User.query.delete()

文档Returns the number of rows deleted, excluding any cascades.

Try delete:

models.User.query.delete()

From the docs: Returns the number of rows deleted, excluding any cascades.


回答 1

DazWorrall的答案是正确的。如果您的代码结构不同于OP的结构,那么以下变体可能会很有用:

num_rows_deleted = db.session.query(Model).delete()

另外,不要忘记在您提交后删除操作才会生效,如以下代码段所示:

try:
    num_rows_deleted = db.session.query(Model).delete()
    db.session.commit()
except:
    db.session.rollback()

DazWorrall’s answer is spot on. Here’s a variation that might be useful if your code is structured differently than the OP’s:

num_rows_deleted = db.session.query(Model).delete()

Also, don’t forget that the deletion won’t take effect until you commit, as in this snippet:

try:
    num_rows_deleted = db.session.query(Model).delete()
    db.session.commit()
except:
    db.session.rollback()

回答 2

烧瓶法术

删除所有记录

#for all records
db.session.query(Model).delete()
db.session.commit()

删除单行

DB是对象Flask-SQLAlchemy类。它将从中删除所有记录,如果要删除特定记录,则filter查询中的try 子句。例如

#for specific value
db.session.query(Model).filter(Model.id==123).delete()
db.session.commit()

按对象删除单个记录

record_obj = db.session.query(Model).filter(Model.id==123).first()
db.session.delete(record_obj)
db.session.commit()

https://flask-sqlalchemy.palletsprojects.com/en/2.x/queries/#deleting-records

Flask-Sqlalchemy

Delete All Records

#for all records
db.session.query(Model).delete()
db.session.commit()

Deleted Single Row

here DB is the object Flask-SQLAlchemy class. It will delete all records from it and if you want to delete specific records then try filter clause in the query. ex.

#for specific value
db.session.query(Model).filter(Model.id==123).delete()
db.session.commit()

Delete Single Record by Object

record_obj = db.session.query(Model).filter(Model.id==123).first()
db.session.delete(record_obj)
db.session.commit()

https://flask-sqlalchemy.palletsprojects.com/en/2.x/queries/#deleting-records


sqlalchemy模型的已定义列上迭代的方法?

问题:sqlalchemy模型的已定义列上迭代的方法?

我一直在尝试找出如何遍历SQLAlchemy模型中定义的列列表。我希望它为一些模型编写一些序列化和复制方法。我不能仅对其进行迭代,obj.__dict__因为它包含许多SA特定项。

有人知道一种从以下项中获取iddesc名称的方法吗?

class JobStatus(Base):
    __tablename__ = 'jobstatus'

    id = Column(Integer, primary_key=True)
    desc = Column(Unicode(20))

在这种情况下,我可以轻松创建一个:

def logme(self):
    return {'id': self.id, 'desc': self.desc}

但我更喜欢自动生成dict(对于较大的对象)的东西。

I’ve been trying to figure out how to iterate over the list of columns defined in a SQLAlchemy model. I want it for writing some serialization and copy methods to a couple of models. I can’t just iterate over the obj.__dict__ since it contains a lot of SA specific items.

Anyone know of a way to just get the id and desc names from the following?

class JobStatus(Base):
    __tablename__ = 'jobstatus'

    id = Column(Integer, primary_key=True)
    desc = Column(Unicode(20))

In this small case I could easily create a:

def logme(self):
    return {'id': self.id, 'desc': self.desc}

but I’d prefer something that auto-generates the dict (for larger objects).


回答 0

您可以使用以下功能:

def __unicode__(self):
    return "[%s(%s)]" % (self.__class__.__name__, ', '.join('%s=%s' % (k, self.__dict__[k]) for k in sorted(self.__dict__) if '_sa_' != k[:4]))

它将排除SA 魔术属性,但不会排除关系。因此,基本上它可能会加载依赖项,父项,子项等,这绝对是不可取的。

但这实际上要容易得多,因为如果继承自Base,则具有__table__属性,因此您可以执行以下操作:

for c in JobStatus.__table__.columns:
    print c

for c in JobStatus.__table__.foreign_keys:
    print c

请参阅如何从SQLAlchemy映射的对象中发现表属性 -类似的问题。

迈克(Mike)编辑:请参见Mapper.cMapper.mapped_table之类的函数。如果使用0.8或更高版本,还请参见Mapper.attrs和相关函数。

Mapper.attrs的示例:

from sqlalchemy import inspect
mapper = inspect(JobStatus)
for column in mapper.attrs:
    print column.key

You could use the following function:

def __unicode__(self):
    return "[%s(%s)]" % (self.__class__.__name__, ', '.join('%s=%s' % (k, self.__dict__[k]) for k in sorted(self.__dict__) if '_sa_' != k[:4]))

It will exclude SA magic attributes, but will not exclude the relations. So basically it might load the dependencies, parents, children etc, which is definitely not desirable.

But it is actually much easier because if you inherit from Base, you have a __table__ attribute, so that you can do:

for c in JobStatus.__table__.columns:
    print c

for c in JobStatus.__table__.foreign_keys:
    print c

See How to discover table properties from SQLAlchemy mapped object – similar question.

Edit by Mike: Please see functions such as Mapper.c and Mapper.mapped_table. If using 0.8 and higher also see Mapper.attrs and related functions.

Example for Mapper.attrs:

from sqlalchemy import inspect
mapper = inspect(JobStatus)
for column in mapper.attrs:
    print column.key

回答 1

您可以从映射器获取已定义属性的列表。对于您的情况,您仅对ColumnProperty对象感兴趣。

from sqlalchemy.orm import class_mapper
import sqlalchemy

def attribute_names(cls):
    return [prop.key for prop in class_mapper(cls).iterate_properties
        if isinstance(prop, sqlalchemy.orm.ColumnProperty)]

You can get the list of defined properties from the mapper. For your case you’re interested in only ColumnProperty objects.

from sqlalchemy.orm import class_mapper
import sqlalchemy

def attribute_names(cls):
    return [prop.key for prop in class_mapper(cls).iterate_properties
        if isinstance(prop, sqlalchemy.orm.ColumnProperty)]

回答 2

我意识到这是一个古老的问题,但是我遇到了相同的要求,并希望为未来的读者提供替代解决方案。

如Josh所述,完整的SQL字段名称将由返回JobStatus.__table__.columns,因此您将获得jobstatus.id而不是原始的字段名称 id。没有那么有用。

获取最初定义的字段名称列表的解决方案是_data在包含完整数据的列对象上查找属性。如果我们看一下JobStatus.__table__.columns._data,它看起来像这样:

{'desc': Column('desc', Unicode(length=20), table=<jobstatus>),
 'id': Column('id', Integer(), table=<jobstatus>, primary_key=True, nullable=False)}

从这里您可以简单地调用JobStatus.__table__.columns._data.keys()给您一个干净的清单:

['id', 'desc']

I realise that this is an old question, but I’ve just come across the same requirement and would like to offer an alternative solution to future readers.

As Josh notes, full SQL field names will be returned by JobStatus.__table__.columns, so rather than the original field name id, you will get jobstatus.id. Not as useful as it could be.

The solution to obtaining a list of field names as they were originally defined is to look the _data attribute on the column object, which contains the full data. If we look at JobStatus.__table__.columns._data, it looks like this:

{'desc': Column('desc', Unicode(length=20), table=<jobstatus>),
 'id': Column('id', Integer(), table=<jobstatus>, primary_key=True, nullable=False)}

From here you can simply call JobStatus.__table__.columns._data.keys() which gives you a nice, clean list:

['id', 'desc']

回答 3

self.__table__.columns将“仅”为您提供在该特定类中定义的列,即没有继承的列。如果需要全部,请使用self.__mapper__.columns。在您的示例中,我可能会使用以下内容:

class JobStatus(Base):

    ...

    def __iter__(self):
        values = vars(self)
        for attr in self.__mapper__.columns.keys():
            if attr in values:
                yield attr, values[attr]

    def logme(self):
        return dict(self)

self.__table__.columns will “only” give you the columns defined in that particular class, i.e. without inherited ones. if you need all, use self.__mapper__.columns. in your example i’d probably use something like this:

class JobStatus(Base):

    ...

    def __iter__(self):
        values = vars(self)
        for attr in self.__mapper__.columns.keys():
            if attr in values:
                yield attr, values[attr]

    def logme(self):
        return dict(self)

回答 4

假设您正在使用SQLAlchemy的声明性映射,则可以使用__mapper__attribute来获取类映射器。要获取所有映射的属性(包括关系):

obj.__mapper__.attrs.keys()

如果需要严格的列名,请使用obj.__mapper__.column_attrs.keys()。有关其他视图,请参见文档。

https://docs.sqlalchemy.org/zh_CN/latest/orm/mapping_api.html#sqlalchemy.orm.mapper.Mapper.attrs

Assuming you’re using SQLAlchemy’s declarative mapping, you can use __mapper__ attribute to get at the class mapper. To get all mapped attributes (including relationships):

obj.__mapper__.attrs.keys()

If you want strictly column names, use obj.__mapper__.column_attrs.keys(). See the documentation for other views.

https://docs.sqlalchemy.org/en/latest/orm/mapping_api.html#sqlalchemy.orm.mapper.Mapper.attrs


回答 5

为了获得as_dict我所有Class的方法,我使用了一个Mixin使用Ants Aasma描述的技术的Class

class BaseMixin(object):                                                                                                                                                                             
    def as_dict(self):                                                                                                                                                                               
        result = {}                                                                                                                                                                                  
        for prop in class_mapper(self.__class__).iterate_properties:                                                                                                                                 
            if isinstance(prop, ColumnProperty):                                                                                                                                                     
                result[prop.key] = getattr(self, prop.key)                                                                                                                                           
        return result

然后在课堂上像这样使用它

class MyClass(BaseMixin, Base):
    pass

这样,您可以在的实例上调用以下内容MyClass

> myclass = MyClass()
> myclass.as_dict()

希望这可以帮助。


我对此进行了进一步的研究,实际上我需要将实例渲染为HAL对象dict的形式,并带有指向相关对象的链接。因此,我在这里添加了这个小技巧,它将覆盖与上述相同的类的所有属性,不同之处在于,我将更深入地搜索属性并自动生成这些属性。Relaionshiplinks

请注意,这仅适用于具有单个主键的关系

from sqlalchemy.orm import class_mapper, ColumnProperty
from functools import reduce


def deepgetattr(obj, attr):
    """Recurses through an attribute chain to get the ultimate value."""
    return reduce(getattr, attr.split('.'), obj)


class BaseMixin(object):
    def as_dict(self):
        IgnoreInstrumented = (
            InstrumentedList, InstrumentedDict, InstrumentedSet
        )
        result = {}
        for prop in class_mapper(self.__class__).iterate_properties:
            if isinstance(getattr(self, prop.key), IgnoreInstrumented):
                # All reverse relations are assigned to each related instances
                # we don't need to link these, so we skip
                continue
            if isinstance(prop, ColumnProperty):
                # Add simple property to the dictionary with its value
                result[prop.key] = getattr(self, prop.key)
            if isinstance(prop, RelationshipProperty):
                # Construct links relaions
                if 'links' not in result:
                    result['links'] = {}

                # Get value using nested class keys
                value = (
                    deepgetattr(
                        self, prop.key + "." + prop.mapper.primary_key[0].key
                    )
                )
                result['links'][prop.key] = {}
                result['links'][prop.key]['href'] = (
                    "/{}/{}".format(prop.key, value)
                )
        return result

To get an as_dict method on all of my classes I used a Mixin class which uses the technics described by Ants Aasma.

class BaseMixin(object):                                                                                                                                                                             
    def as_dict(self):                                                                                                                                                                               
        result = {}                                                                                                                                                                                  
        for prop in class_mapper(self.__class__).iterate_properties:                                                                                                                                 
            if isinstance(prop, ColumnProperty):                                                                                                                                                     
                result[prop.key] = getattr(self, prop.key)                                                                                                                                           
        return result

And then use it like this in your classes

class MyClass(BaseMixin, Base):
    pass

That way you can invoke the following on an instance of MyClass.

> myclass = MyClass()
> myclass.as_dict()

Hope this helps.


I’ve played arround with this a bit further, I actually needed to render my instances as dict as the form of a HAL object with it’s links to related objects. So I’ve added this little magic down here, which will crawl over all properties of the class same as the above, with the difference that I will crawl deeper into Relaionship properties and generate links for these automatically.

Please note that this will only work for relationships have a single primary key

from sqlalchemy.orm import class_mapper, ColumnProperty
from functools import reduce


def deepgetattr(obj, attr):
    """Recurses through an attribute chain to get the ultimate value."""
    return reduce(getattr, attr.split('.'), obj)


class BaseMixin(object):
    def as_dict(self):
        IgnoreInstrumented = (
            InstrumentedList, InstrumentedDict, InstrumentedSet
        )
        result = {}
        for prop in class_mapper(self.__class__).iterate_properties:
            if isinstance(getattr(self, prop.key), IgnoreInstrumented):
                # All reverse relations are assigned to each related instances
                # we don't need to link these, so we skip
                continue
            if isinstance(prop, ColumnProperty):
                # Add simple property to the dictionary with its value
                result[prop.key] = getattr(self, prop.key)
            if isinstance(prop, RelationshipProperty):
                # Construct links relaions
                if 'links' not in result:
                    result['links'] = {}

                # Get value using nested class keys
                value = (
                    deepgetattr(
                        self, prop.key + "." + prop.mapper.primary_key[0].key
                    )
                )
                result['links'][prop.key] = {}
                result['links'][prop.key]['href'] = (
                    "/{}/{}".format(prop.key, value)
                )
        return result

回答 6

self.__dict__

返回一个dict,其中键是属性名称,其值是对象的值。

/!\有一个补充属性:’_sa_instance_state’,但是您可以处理它:)

self.__dict__

returns a dict where keys are attribute names and values the values of the object.

/!\ there is a supplementary attribute: ‘_sa_instance_state’ but you can handle it :)


回答 7

我知道这是一个古老的问题,但是:

class JobStatus(Base):

    ...

    def columns(self):
        return [col for col in dir(self) if isinstance(col, db.Column)]

然后,获取列名: jobStatus.columns()

那会回来 ['id', 'desc']

然后,您可以遍历,并对列和值进行处理:

for col in jobStatus.colums():
    doStuff(getattr(jobStatus, col))

I know this is an old question, but what about:

class JobStatus(Base):

    ...

    def columns(self):
        return [col for col in dir(self) if isinstance(col, db.Column)]

Then, to get column names: jobStatus.columns()

That would return ['id', 'desc']

Then you can loop through, and do stuff with the columns and values:

for col in jobStatus.colums():
    doStuff(getattr(jobStatus, col))

如何使用SqlAlchemy通过ID查询数据库?

问题:如何使用SqlAlchemy通过ID查询数据库?

我需要按其查询SQLAlchemy数据库 id类似于以下内容

User.query.filter_by(username =’peter’)

但为身份证。我该怎么做呢?[通过Google和SO搜索没有帮助]

I need to query a SQLAlchemy database by its id something similar to

User.query.filter_by(username=’peter’)

but for id. How do I do this? [Searching over Google and SO didn’t help]


回答 0

查询具有一个get函数,该函数支持通过表的主键进行查询,我认为id是这样。

例如,要查询ID为23的对象:

User.query.get(23)

注意:正如其他一些评论者和答案所述,这不仅仅是“对主键执行查询过滤”的简写。根据SQLAlchemy会话的状态,运行此代码可能会查询数据库并返回一个新实例,或者它可能会返回在代码中较早查询的对象的实例,而无需实际查询数据库。如果您尚未这样做,请考虑阅读SQLAlchemy会话上的文档以了解后果。

Query has a get function that supports querying by the primary key of the table, which I assume that id is.

For example, to query for an object with ID of 23:

User.query.get(23)

Note: As a few other commenters and answers have mentioned, this is not simply shorthand for “Perform a query filtering on the primary key”. Depending on the state of the SQLAlchemy session, running this code may query the database and return a new instance, or it may return an instance of an object queried earlier in your code without actually querying the database. If you have not already done so, consider reading the documentation on the SQLAlchemy Session to understand the ramifications.


回答 1

您可以像这样查询ID = 1的用户

session.query(User).get(1)

You can query an User with id = 1 like this

session.query(User).get(1)


回答 2

有时get()并非您所期望的:

如果您的交易已完成:

>>> session.query(User).get(1)
[SQL]: BEGIN (implicit)
[SQL]: SELECT user.id AS user_id, user.name AS user_name, user.fullname AS user_fullname
FROM user
WHERE user.id = ?
[SQL]: (1,)
<User(u'ed', u'Ed Jones')>

如果您正在进行事务处理(get()将在不查询数据库的情况下为您提供结果对象在内存中):

>>> session.query(User).get(1)
<User(u'ed', u'Ed Jones')>

最好使用这个:

>>> session.query(User.name).filter(User.id == 1).first()
[SQL]: SELECT user.name AS user_name
FROM user
WHERE user.id = ?
 LIMIT ? OFFSET ?
[SQL]: (1, 1, 0)
(u'Edwardo',)

get() is not as your expected sometimes:

if your transaction was done:

>>> session.query(User).get(1)
[SQL]: BEGIN (implicit)
[SQL]: SELECT user.id AS user_id, user.name AS user_name, user.fullname AS user_fullname
FROM user
WHERE user.id = ?
[SQL]: (1,)
<User(u'ed', u'Ed Jones')>

if you are in a transaction(get() will give you the result object in memory without query the database):

>>> session.query(User).get(1)
<User(u'ed', u'Ed Jones')>

better to use this:

>>> session.query(User.name).filter(User.id == 1).first()
[SQL]: SELECT user.name AS user_name
FROM user
WHERE user.id = ?
 LIMIT ? OFFSET ?
[SQL]: (1, 1, 0)
(u'Edwardo',)

回答 3

如果您使用的tables reflection话,可能会遇到给定的解决方案问题。(这里的先前解决方案对我不起作用)。

我最终使用的是:

session.query(object._class_).get(id)

object是通过反射从数据库中检索到的,这就是为什么需要使用.__class__

我希望这有帮助。

If you use tables reflection you might have problems with the solutions given. (The previous solutions here didn’t work for me).

What I ended up using was:

session.query(object._class_).get(id)

(object was retrieved by reflection from the database, this is why you need to use .__class__)

I hope this helps.


回答 4

首先,应将其设置id为主键。
然后您可以使用该query.get()方法来查询对象id已经是主键的对象。

由于该query.get()方法通过主键查询对象。
Flask-SQLAlchemy文档推断

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
db = SQLAlchemy()
db.init_app(app)

class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)

def test():
    id = 1
    user = User.query.get(id)

First, you should set id as the primary key.
Then you could use the query.get() method to query objects by id which is already the primary key.

Since the query.get() method to query objects by the primary key.
Inferred from Flask-SQLAlchemy documentation

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
db = SQLAlchemy()
db.init_app(app)

class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)

def test():
    id = 1
    user = User.query.get(id)

SQLAlchemy-获取表列表

问题:SQLAlchemy-获取表列表

我在文档中找不到与此有关的任何信息,但是如何获得在SQLAlchemy中创建的表的列表?

我使用了类方法来创建表。

I couldn’t find any information about this in the documentation, but how can I get a list of tables created in SQLAlchemy?

I used the class method to create the tables.


回答 0

所有表都收集在tablesSQLAlchemy MetaData对象的属性中。要获取这些表的名称列表:

>>> metadata.tables.keys()
['posts', 'comments', 'users']

如果使用声明性扩展,则可能不是您自己管理元数据。幸运的是,元数据仍然存在于基类中,

>>> Base = sqlalchemy.ext.declarative.declarative_base()
>>> Base.metadata
MetaData(None)

如果您试图弄清楚数据库中存在哪些表,甚至还没有告诉SQLAlchemy的表,那么可以使用表反射。然后,SQLAlchemy将检查数据库并使用所有缺少的表更新元数据。

>>> metadata.reflect(engine)

对于Postgres,如果您有多个架构,则需要遍历引擎中的所有架构:

from sqlalchemy import inspect
inspector = inspect(engine)
schemas = inspector.get_schema_names()

for schema in schemas:
    print("schema: %s" % schema)
    for table_name in inspector.get_table_names(schema=schema):
        for column in inspector.get_columns(table_name, schema=schema):
            print("Column: %s" % column)

All of the tables are collected in the tables attribute of the SQLAlchemy MetaData object. To get a list of the names of those tables:

>>> metadata.tables.keys()
['posts', 'comments', 'users']

If you’re using the declarative extension, then you probably aren’t managing the metadata yourself. Fortunately, the metadata is still present on the baseclass,

>>> Base = sqlalchemy.ext.declarative.declarative_base()
>>> Base.metadata
MetaData(None)

If you are trying to figure out what tables are present in your database, even among the ones you haven’t even told SQLAlchemy about yet, then you can use table reflection. SQLAlchemy will then inspect the database and update the metadata with all of the missing tables.

>>> metadata.reflect(engine)

For Postgres, if you have multiple schemas, you’ll need to loop thru all the schemas in the engine:

from sqlalchemy import inspect
inspector = inspect(engine)
schemas = inspector.get_schema_names()

for schema in schemas:
    print("schema: %s" % schema)
    for table_name in inspector.get_table_names(schema=schema):
        for column in inspector.get_columns(table_name, schema=schema):
            print("Column: %s" % column)

回答 1

engine对象中有一个方法来获取表名称列表。engine.table_names()

There is a method in engine object to fetch the list of tables name. engine.table_names()


回答 2

from sqlalchemy import create_engine
engine = create_engine('postgresql://use:pass@localhost/DBname')
print (engine.table_names())
from sqlalchemy import create_engine
engine = create_engine('postgresql://use:pass@localhost/DBname')
print (engine.table_names())

回答 3

在python解释器中,使用db.engine.table_names()

$ python
>>> from myapp import db
>>> db.engine.table_names()

Within the python interpreter use db.engine.table_names()

$ python
>>> from myapp import db
>>> db.engine.table_names()

回答 4

我一直在寻找这样的东西:

from sqlalchemy import create_engine
eng = create_engine('mysql+pymysql://root:password@localhost:3306', pool_recycle=3600)
q = eng.execute('SHOW TABLES')

available_tables = q.fetchall()

它执行并返回所有表。

更新:

Postgres:

eng = create_engine('postgresql+psycopg2://root:password@localhost/
q = eng.execute('SELECT * FROM pg_catalog.pg_tables')

I was looking for something like this:

from sqlalchemy import create_engine
eng = create_engine('mysql+pymysql://root:password@localhost:3306', pool_recycle=3600)
q = eng.execute('SHOW TABLES')

available_tables = q.fetchall()

It does an execute and returns all of the tables.

update:

Postgres:

eng = create_engine('postgresql+psycopg2://root:password@localhost/
q = eng.execute('SELECT * FROM pg_catalog.pg_tables')

回答 5

用于创建表的元数据对象在字典中具有该对象。

metadata.tables.keys()

The metadata object that you created the tables with has that in a dictionary.

metadata.tables.keys()

回答 6

我正在解决相同的问题,并找到了这篇文章。经过一些尝试运行后,我建议使用下面的方法列出所有表:(由zerocog提及)

metadata = MetaData()
metadata.reflect(bind=engine)
for table in metadata.sorted_tables:
    print(table)

这对于直接处理表很有用,我建议您这样做。

并使用以下代码获取表名:

for table_name in engine.table_names():
    print(table_name)

“ metadata.tables”为表名和表对象提供了一个Dict。这对于快速查询也很有用。

I’m solving same problem and found this post. After some try run, I would suggest use below to list all tables: (mentioned by zerocog)

metadata = MetaData()
metadata.reflect(bind=engine)
for table in metadata.sorted_tables:
    print(table)

This is useful for direct table handling and I feel is recommended.

And use below code to get table names:

for table_name in engine.table_names():
    print(table_name)

“metadata.tables” provides a Dict for table name and Table object. which would also be useful for quick query.


回答 7

一次反映所有表允许您也检索隐藏的表名。我创建了一些临时表,他们出现了

meta = MetaData()
meta.reflect(bind=myengine)
for table in reversed(meta.sorted_tables):
    print table

参考http://docs.sqlalchemy.org/en/latest/core/reflection.html

Reflecting All Tables at Once allows you to retrieve hidden table names too. I created some temporary tables and they showed up with

meta = MetaData()
meta.reflect(bind=myengine)
for table in reversed(meta.sorted_tables):
    print table

Reference http://docs.sqlalchemy.org/en/latest/core/reflection.html


回答 8

就这么简单:

engine.table_names()

另外,要测试表是否存在:

engine.has_table(table_name)

Just this simple:

engine.table_names()

Also, to test whether a table exists:

engine.has_table(table_name)

如何在Alembic升级脚本中执行插入和更新?

问题:如何在Alembic升级脚本中执行插入和更新?

我需要在Alembic升级期间更改数据。

我目前在第一个修订版中有一个“玩家”表:

def upgrade():
    op.create_table('player',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('name', sa.Unicode(length=200), nullable=False),
        sa.Column('position', sa.Unicode(length=200), nullable=True),
        sa.Column('team', sa.Unicode(length=100), nullable=True)
        sa.PrimaryKeyConstraint('id')
    )

我想介绍一个“团队”表。我创建了第二个修订版:

def upgrade():
    op.create_table('teams',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('name', sa.String(length=80), nullable=False)
    )
    op.add_column('players', sa.Column('team_id', sa.Integer(), nullable=False))

我希望第二次迁移也添加以下数据:

  1. 填充团队表:

    INSERT INTO teams (name) SELECT DISTINCT team FROM players;
  2. 根据players.team名称更新players.team_id:

    UPDATE players AS p JOIN teams AS t SET p.team_id = t.id WHERE p.team = t.name;

如何在升级脚本中执行插入和更新?

I need to alter data during an Alembic upgrade.

I currently have a ‘players’ table in a first revision:

def upgrade():
    op.create_table('player',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('name', sa.Unicode(length=200), nullable=False),
        sa.Column('position', sa.Unicode(length=200), nullable=True),
        sa.Column('team', sa.Unicode(length=100), nullable=True)
        sa.PrimaryKeyConstraint('id')
    )

I want to introduce a ‘teams’ table. I’ve created a second revision:

def upgrade():
    op.create_table('teams',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('name', sa.String(length=80), nullable=False)
    )
    op.add_column('players', sa.Column('team_id', sa.Integer(), nullable=False))

I would like the second migration to also add the following data:

  1. Populate teams table:

    INSERT INTO teams (name) SELECT DISTINCT team FROM players;
    
  2. Update players.team_id based on players.team name:

    UPDATE players AS p JOIN teams AS t SET p.team_id = t.id WHERE p.team = t.name;
    

How do I execute inserts and updates inside the upgrade script?


回答 0

您需要的是数据迁移,而不是Alembic文档中最普遍的模式迁移

该答案假设您使用声明式(而不是class-Mapper-Table或core)定义模型。使它适应其他形式应该相对简单。

请注意,Alembic提供了一些基本数据功能:op.bulk_insert()op.execute()。如果操作很少,请使用这些操作。如果迁移需要关系或其他复杂的交互作用,我更喜欢使用模型和会话的全部功能,如下所述。

以下是示例迁移脚本,该脚本设置了一些声明性模型,这些声明性模型将用于处理会话中的数据。关键点是:

  1. 使用所需的列定义所需的基本模型。您不需要每一列,只需要主键和将要使用的那些。

  2. 在升级功能中,用于op.get_bind()获取当前连接并与其建立会话。

    • 或者使用bind.execute()SQLAlchemy的较低级别直接编写SQL查询。这对于简单的迁移很有用。
  3. 像通常在应用程序中一样使用模型和会话。

"""create teams table

Revision ID: 169ad57156f0
Revises: 29b4c2bfce6d
Create Date: 2014-06-25 09:00:06.784170
"""

revision = '169ad57156f0'
down_revision = '29b4c2bfce6d'

from alembic import op
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class Player(Base):
    __tablename__ = 'players'

    id = sa.Column(sa.Integer, primary_key=True)
    name = sa.Column(sa.String, nullable=False)
    team_name = sa.Column('team', sa.String, nullable=False)
    team_id = sa.Column(sa.Integer, sa.ForeignKey('teams.id'), nullable=False)

    team = orm.relationship('Team', backref='players')


class Team(Base):
    __tablename__ = 'teams'

    id = sa.Column(sa.Integer, primary_key=True)
    name = sa.Column(sa.String, nullable=False, unique=True)


def upgrade():
    bind = op.get_bind()
    session = orm.Session(bind=bind)

    # create the teams table and the players.team_id column
    Team.__table__.create(bind)
    op.add_column('players', sa.Column('team_id', sa.ForeignKey('teams.id'), nullable=False)

    # create teams for each team name
    teams = {name: Team(name=name) for name in session.query(Player.team).distinct()}
    session.add_all(teams.values())

    # set player team based on team name
    for player in session.query(Player):
        player.team = teams[player.team_name]

    session.commit()

    # don't need team name now that team relationship is set
    op.drop_column('players', 'team')


def downgrade():
    bind = op.get_bind()
    session = orm.Session(bind=bind)

    # re-add the players.team column
    op.add_column('players', sa.Column('team', sa.String, nullable=False)

    # set players.team based on team relationship
    for player in session.query(Player):
        player.team_name = player.team.name

    session.commit()

    op.drop_column('players', 'team_id')
    op.drop_table('teams')

迁移定义了单独的模型,因为代码中的模型表示数据库的当前状态,而迁移表示过程中的步骤。您的数据库可能沿着该路径处于任何状态,因此模型可能尚未与数据库同步。除非您非常小心,否则直接使用真实模型会导致缺少列,无效数据等问题。更清晰地明确说明要在迁移中使用的列和模型。

What you are asking for is a data migration, as opposed to the schema migration that is most prevalent in the Alembic docs.

This answer assumes you are using declarative (as opposed to class-Mapper-Table or core) to define your models. It should be relatively straightforward to adapt this to the other forms.

Note that Alembic provides some basic data functions: op.bulk_insert() and op.execute(). If the operations are fairly minimal, use those. If the migration requires relationships or other complex interactions, I prefer to use the full power of models and sessions as described below.

The following is an example migration script that sets up some declarative models that will be used to manipulate data in a session. The key points are:

  1. Define the basic models you need, with the columns you’ll need. You don’t need every column, just the primary key and the ones you’ll be using.

  2. Within the upgrade function, use op.get_bind() to get the current connection, and make a session with it.

    • Or use bind.execute() to use SQLAlchemy’s lower level to write SQL queries directly. This is useful for simple migrations.
  3. Use the models and session as you normally would in your application.

"""create teams table

Revision ID: 169ad57156f0
Revises: 29b4c2bfce6d
Create Date: 2014-06-25 09:00:06.784170
"""

revision = '169ad57156f0'
down_revision = '29b4c2bfce6d'

from alembic import op
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class Player(Base):
    __tablename__ = 'players'

    id = sa.Column(sa.Integer, primary_key=True)
    name = sa.Column(sa.String, nullable=False)
    team_name = sa.Column('team', sa.String, nullable=False)
    team_id = sa.Column(sa.Integer, sa.ForeignKey('teams.id'), nullable=False)

    team = orm.relationship('Team', backref='players')


class Team(Base):
    __tablename__ = 'teams'

    id = sa.Column(sa.Integer, primary_key=True)
    name = sa.Column(sa.String, nullable=False, unique=True)


def upgrade():
    bind = op.get_bind()
    session = orm.Session(bind=bind)

    # create the teams table and the players.team_id column
    Team.__table__.create(bind)
    op.add_column('players', sa.Column('team_id', sa.ForeignKey('teams.id'), nullable=False)

    # create teams for each team name
    teams = {name: Team(name=name) for name in session.query(Player.team).distinct()}
    session.add_all(teams.values())

    # set player team based on team name
    for player in session.query(Player):
        player.team = teams[player.team_name]

    session.commit()

    # don't need team name now that team relationship is set
    op.drop_column('players', 'team')


def downgrade():
    bind = op.get_bind()
    session = orm.Session(bind=bind)

    # re-add the players.team column
    op.add_column('players', sa.Column('team', sa.String, nullable=False)

    # set players.team based on team relationship
    for player in session.query(Player):
        player.team_name = player.team.name

    session.commit()

    op.drop_column('players', 'team_id')
    op.drop_table('teams')

The migration defines separate models because the models in your code represent the current state of the database, while the migrations represent steps along the way. Your database might be in any state along that path, so the models might not sync up with the database yet. Unless you’re very careful, using the real models directly will cause problems with missing columns, invalid data, etc. It’s clearer to explicitly state exactly what columns and models you will use in the migration.


回答 1

您还可以使用直接SQL参见(Alembic操作参考),如以下示例所示:

from alembic import op

# revision identifiers, used by Alembic.
revision = '1ce7873ac4ced2'
down_revision = '1cea0ac4ced2'
branch_labels = None
depends_on = None


def upgrade():
    # ### commands made by andrew ###
    op.execute('UPDATE STOCK SET IN_STOCK = -1 WHERE IN_STOCK IS NULL')
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    pass
    # ### end Alembic commands ###

You can also use direct SQL see (Alembic Operation Reference) as in the following example:

from alembic import op

# revision identifiers, used by Alembic.
revision = '1ce7873ac4ced2'
down_revision = '1cea0ac4ced2'
branch_labels = None
depends_on = None


def upgrade():
    # ### commands made by andrew ###
    op.execute('UPDATE STOCK SET IN_STOCK = -1 WHERE IN_STOCK IS NULL')
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    pass
    # ### end Alembic commands ###

回答 2

我建议使用临时表来使用SQLAlchemy核心语句(如官方文档中所述),因为它允许使用不可知论的SQL和pythonic编写,并且也是独立的。对于迁移脚本,SQLAlchemy Core是两全其美的。

这是概念的示例:

from sqlalchemy.sql import table, column
from sqlalchemy import String
from alembic import op

account = table('account',
    column('name', String)
)
op.execute(
    account.update().\\
    where(account.c.name==op.inline_literal('account 1')).\\
        values({'name':op.inline_literal('account 2')})
        )

# If insert is required
from sqlalchemy.sql import insert
from sqlalchemy import orm

session = orm.Session(bind=bind)
bind = op.get_bind()

data = {
    "name": "John",
}
ret = session.execute(insert(account).values(data))
# for use in other insert calls
account_id = ret.lastrowid

I recommend using SQLAlchemy core statements using an ad-hoc table, as detailed in the official documentation, because it allows the use of agnostic SQL and pythonic writing and is also self-contained. SQLAlchemy Core is the best of both worlds for migration scripts.

Here is an example of the concept:

from sqlalchemy.sql import table, column
from sqlalchemy import String
from alembic import op

account = table('account',
    column('name', String)
)
op.execute(
    account.update().\\
    where(account.c.name==op.inline_literal('account 1')).\\
        values({'name':op.inline_literal('account 2')})
        )

# If insert is required
from sqlalchemy.sql import insert
from sqlalchemy import orm

session = orm.Session(bind=bind)
bind = op.get_bind()

data = {
    "name": "John",
}
ret = session.execute(insert(account).values(data))
# for use in other insert calls
account_id = ret.lastrowid

SQLAlchemy ORM转换为Pandas DataFrame

问题:SQLAlchemy ORM转换为Pandas DataFrame

这个话题已经有一段时间没有在这里或其他地方了。是否有将SQLAlchemy <Query object>转换为pandas DataFrame 的解决方案?

Pandas具有使用能力,pandas.read_sql但这需要使用原始SQL。我有两个避免发生这种情况的原因:1)我已经使用ORM拥有了一切(本身就是一个很好的理由),并且2)我正在使用python列表作为查询的一部分(例如:模型类.db.session.query(Item).filter(Item.symbol.in_(add_symbols)在哪里Item)并且add_symbols是列表)。这等效于SQL SELECT ... from ... WHERE ... IN

有什么可能吗?

This topic hasn’t been addressed in a while, here or elsewhere. Is there a solution converting a SQLAlchemy <Query object> to a pandas DataFrame?

Pandas has the capability to use pandas.read_sql but this requires use of raw SQL. I have two reasons for wanting to avoid it: 1) I already have everything using the ORM (a good reason in and of itself) and 2) I’m using python lists as part of the query (eg: .db.session.query(Item).filter(Item.symbol.in_(add_symbols) where Item is my model class and add_symbols is a list). This is the equivalent of SQL SELECT ... from ... WHERE ... IN.

Is anything possible?


回答 0

在大多数情况下,下面的代码应该有效:

df = pd.read_sql(query.statement, query.session.bind)

有关pandas.read_sql参数的更多信息,请参见文档。

Below should work in most cases:

df = pd.read_sql(query.statement, query.session.bind)

See pandas.read_sql documentation for more information on the parameters.


回答 1

为了让新手熊猫程序员更加清楚,这是一个具体示例,

pd.read_sql(session.query(Complaint).filter(Complaint.id == 2).statement,session.bind) 

在这里,我们从id = 2的投诉表(sqlalchemy模型为Complaint)中选择一个投诉

Just to make this more clear for novice pandas programmers, here is a concrete example,

pd.read_sql(session.query(Complaint).filter(Complaint.id == 2).statement,session.bind) 

Here we select a complaint from complaints table (sqlalchemy model is Complaint) with id = 2


回答 2

所选解决方案对我不起作用,因为我不断收到错误消息

AttributeError:’AnnotatedSelect’对象没有属性’lower’

我发现以下工作:

df = pd.read_sql_query(query.statement, engine)

The selected solution didn’t work for me, as I kept getting the error

AttributeError: ‘AnnotatedSelect’ object has no attribute ‘lower’

I found the following worked:

df = pd.read_sql_query(query.statement, engine)

回答 3

如果要使用参数编译查询并说方言特定的参数,请使用以下命令:

c = query.statement.compile(query.session.bind)
df = pandas.read_sql(c.string, query.session.bind, params=c.params)

If you want to compile a query with parameters and dialect specific arguments, use something like this:

c = query.statement.compile(query.session.bind)
df = pandas.read_sql(c.string, query.session.bind, params=c.params)

回答 4

from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

engine = create_engine('postgresql://postgres:postgres@localhost:5432/DB', echo=False)
Base = declarative_base(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()

conn = session.bind

class DailyTrendsTable(Base):

    __tablename__ = 'trends'
    __table_args__ = ({"schema": 'mf_analysis'})

    company_code = Column(DOUBLE_PRECISION, primary_key=True)
    rt_bullish_trending = Column(Integer)
    rt_bearish_trending = Column(Integer)
    rt_bullish_non_trending = Column(Integer)
    rt_bearish_non_trending = Column(Integer)
    gen_date = Column(Date, primary_key=True)

df_query = select([DailyTrendsTable])

df_data = pd.read_sql(rt_daily_query, con = conn)
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

engine = create_engine('postgresql://postgres:postgres@localhost:5432/DB', echo=False)
Base = declarative_base(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()

conn = session.bind

class DailyTrendsTable(Base):

    __tablename__ = 'trends'
    __table_args__ = ({"schema": 'mf_analysis'})

    company_code = Column(DOUBLE_PRECISION, primary_key=True)
    rt_bullish_trending = Column(Integer)
    rt_bearish_trending = Column(Integer)
    rt_bullish_non_trending = Column(Integer)
    rt_bearish_non_trending = Column(Integer)
    gen_date = Column(Date, primary_key=True)

df_query = select([DailyTrendsTable])

df_data = pd.read_sql(rt_daily_query, con = conn)