标签归档:sqlalchemy

奇怪的SQLAlchemy错误消息:TypeError:’dict’对象不支持索引

问题:奇怪的SQLAlchemy错误消息:TypeError:’dict’对象不支持索引

我正在使用手工SQL,使用SqlAlchemy从PG数据库中获取数据。我正在尝试一个查询,其中包含类似运算符’%’的SQL,并且该查询似乎通过循环抛出SqlAlcjhemy:

sql = """
       SELECT DISTINCT u.name from user u
        INNER JOIN city c ON u.city_id = c.id
        WHERE c.designation=upper('fantasy') 
        AND c.id IN (select id from ref_geog where short_name LIKE '%opt')
      """

# The last line in the above statement throws the error mentioned in the title. 
# However if the last line is change to:
# AND c.id IN (select id from ref_geog where short_name = 'helloopt')
# the script runs correctly.
#
# I also tried double escaping the '%' i.e. using '%%' instead - that generated the same error as previously.

connectDb()
res = executeSql(sql)
print res
closeDbConnection()

谁知道是什么原因导致了这种误导性错误消息,以及我该如何解决?

[[编辑]]

在任何人问之前,上面所包含的功能没有什么特别或特别的地方。例如,函数executeSql()仅调用conn.execute(sql)并返回结果。变量conn只是先前建立的与数据库的连接。

I am using hand crafted SQL to fetch data from a PG database, using SqlAlchemy. I am trying a query which contains the SQL like operator ‘%’ and that seems to throw SqlAlcjhemy through a loop:

sql = """
       SELECT DISTINCT u.name from user u
        INNER JOIN city c ON u.city_id = c.id
        WHERE c.designation=upper('fantasy') 
        AND c.id IN (select id from ref_geog where short_name LIKE '%opt')
      """

# The last line in the above statement throws the error mentioned in the title. 
# However if the last line is change to:
# AND c.id IN (select id from ref_geog where short_name = 'helloopt')
# the script runs correctly.
#
# I also tried double escaping the '%' i.e. using '%%' instead - that generated the same error as previously.

connectDb()
res = executeSql(sql)
print res
closeDbConnection()

Any one knows what is causing this misleading error message and how I may fix it?

[[Edit]]

Before any one asks, there is nothing special or fancy about the functions included above. For example the function executeSql() simply invokes conn.execute(sql) and returns the results. The variable conn is simply the previously established connection to the database.


回答 0

您必须%%使用它,%因为%在python中它被用作字符串格式,因此当您编写单个时,%它会假定您将用此值替换某些值。

因此,当您要%在查询中始终将string放置为all时,请放置double %

You have to give %% to use it as % because % in python is use as string formatting so when you write single % its assume that you are going to replace some value with this.

So when you want to place single % in string with query allways place double %.


回答 1

SQLAlchemy具有text()包装文本的功能,该文本似乎可以为您正确地转义SQL。

res = executeSql(sqlalchemy.text(sql))

应该可以为您工作,并使您不必进行手动转义。

SQLAlchemy has a text() function for wrapping text which appears to correctly escape the SQL for you.

I.e.

res = executeSql(sqlalchemy.text(sql))

should work for you and save you from having to do the manual escaping.


回答 2

我在sqlalchemy 1.2版文档中找不到“ executeSql” ,但以下代码对我有用

engine.execute(sqlalchemy.text(sql_query))

I cannot find the “executeSql” in sqlalchemy version 1.2 docs , but the below line worked for me

engine.execute(sqlalchemy.text(sql_query))

回答 3

看来您的问题可能与此bug有关

在这种情况下,您应该使用三倍转义来解决。

It seems like your problem may be related to this bug.

In which case, you should triple-escape as a workaround.


回答 4

当出现此错误时,我发现了另一种情况:

c.execute("SELECT * FROM t WHERE a = %s")

换句话说,如果您%s在查询中提供参数(),但您忘记添加查询参数。在这种情况下,错误消息非常容易引起误解。

I found one more case when this error shows up:

c.execute("SELECT * FROM t WHERE a = %s")

In other words, if you provide parameter (%s) in query, but you forget to add query params. In this case error message is very misleading.


回答 5

还有一个注意事项-您还必须转义(或删除)%注释中的字符。不幸的是,sqlalchemy.text(query_string)不能逃脱评论中的百分号。

One more note- you must escape (or delete) % characters in comments as well. Unfortunately, sqlalchemy.text(query_string) does not escape the percent signs in the comments.


回答 6

如果您不想转义%字符或使用sqlalchemy.text(),则解决问题的另一种方法是使用正则表达式。

代替:

select id from ref_geog where short_name LIKE '%opt'

尝试(区分大小写的匹配):

select id from ref_geog where short_name ~ 'opt$' 

或(不区分大小写):

select id from ref_geog where short_name ~* 'opt$'

两个LIKE和正则表达式被覆盖中模式匹配的文档

注意:

与LIKE模式不同,除非将正则表达式明确地锚定到字符串的开头或结尾,否则允许在字符串中的任何位置匹配正则表达式。

对于锚,可以将断言$用于字符串的结尾(或^开头)。

Another way of solving your problem, if you don’t want to escape % characters or use sqlalchemy.text(), is to use a regular expression.

Instead of:

select id from ref_geog where short_name LIKE '%opt'

Try (for case-sensitive match):

select id from ref_geog where short_name ~ 'opt$' 

or (for case-insensitive):

select id from ref_geog where short_name ~* 'opt$'

Both LIKE and regex are covered in the documentation on pattern matching.

Note that:

Unlike LIKE patterns, a regular expression is allowed to match anywhere within a string, unless the regular expression is explicitly anchored to the beginning or end of the string.

For an anchor, you can use the assertion $ for end of string (or ^ for beginning).


回答 7

这也可能是由于这种情况-如果要在DICT formate中声明要传递到SQL的参数,并在SQL中以LIST或TUPPLE的形式对其进行操作。

This could also result from the case – in case parameters to be passed onto the SQL are declared in DICT formate and are being manipulated in the SQL in the form of LIST or TUPPLE.


如何在Flask-SQLAlchemy应用中执行原始SQL

问题:如何在Flask-SQLAlchemy应用中执行原始SQL

如何在SQLAlchemy中执行原始SQL?

我有一个在烧瓶上运行的python Web应用程序,并通过SQLAlchemy连接到数据库。

我需要一种运行原始SQL的方法。该查询涉及多个表联接以及内联视图。

我试过了:

connection = db.session.connection()
connection.execute( <sql here> )

但是我不断收到网关错误。

How do you execute raw SQL in SQLAlchemy?

I have a python web app that runs on flask and interfaces to the database through SQLAlchemy.

I need a way to run the raw SQL. The query involves multiple table joins along with Inline views.

I’ve tried:

connection = db.session.connection()
connection.execute( <sql here> )

But I keep getting gateway errors.


回答 0

你有没有尝试过:

result = db.engine.execute("<sql here>")

要么:

from sqlalchemy import text

sql = text('select name from penguins')
result = db.engine.execute(sql)
names = [row[0] for row in result]
print names

Have you tried:

result = db.engine.execute("<sql here>")

or:

from sqlalchemy import text

sql = text('select name from penguins')
result = db.engine.execute(sql)
names = [row[0] for row in result]
print names

回答 1

SQL Alchemy会话对象具有自己的execute方法:

result = db.session.execute('SELECT * FROM my_table WHERE my_column = :val', {'val': 5})

您的所有应用程序查询都应通过会话对象,无论它们是否是原始SQL。这样可以确保由事务适当地管理查询,该事务允许将同一请求中的多个查询作为一个单元提交或回滚。使用引擎连接进行事务处理将使您面临更大的隐患,即可能很难检测到可能导致数据损坏的错误的风险。每个请求应仅与一个事务相关联,并且使用db.session可以确保您的应用程序是这种情况。

另请注意,它execute是为参数化查询设计的。使用:val示例中的参数作为查询的任何输入,以保护自己免受SQL注入攻击。您可以通过传递a dict作为第二个参数来提供这些参数的值,其中每个键都是在查询中显示的参数名称。参数本身的确切语法可能会有所不同,具体取决于您的数据库,但是所有主要的关系数据库都以某种形式支持它们。

假设这是一个SELECT查询,它将返回一个可迭代RowProxy对象。

您可以使用多种技术访问各个列:

for r in result:
    print(r[0]) # Access by positional index
    print(r['my_column']) # Access by column name as a string
    r_dict = dict(r.items()) # convert to dict keyed by column names

就个人而言,我更喜欢将结果转换为namedtuples:

from collections import namedtuple

Record = namedtuple('Record', result.keys())
records = [Record(*r) for r in result.fetchall()]
for r in records:
    print(r.my_column)
    print(r)

如果您不使用Flask-SQLAlchemy扩展,您仍然可以轻松使用会话:

import sqlalchemy
from sqlalchemy.orm import sessionmaker, scoped_session

engine = sqlalchemy.create_engine('my connection string')
Session = scoped_session(sessionmaker(bind=engine))

s = Session()
result = s.execute('SELECT * FROM my_table WHERE my_column = :val', {'val': 5})

SQL Alchemy session objects have their own execute method:

result = db.session.execute('SELECT * FROM my_table WHERE my_column = :val', {'val': 5})

All your application queries should be going through a session object, whether they’re raw SQL or not. This ensures that the queries are properly managed by a transaction, which allows multiple queries in the same request to be committed or rolled back as a single unit. Going outside the transaction using the engine or the connection puts you at much greater risk of subtle, possibly hard to detect bugs that can leave you with corrupted data. Each request should be associated with only one transaction, and using db.session will ensure this is the case for your application.

Also take note that execute is designed for parameterized queries. Use parameters, like :val in the example, for any inputs to the query to protect yourself from SQL injection attacks. You can provide the value for these parameters by passing a dict as the second argument, where each key is the name of the parameter as it appears in the query. The exact syntax of the parameter itself may be different depending on your database, but all of the major relational databases support them in some form.

Assuming it’s a SELECT query, this will return an iterable of RowProxy objects.

You can access individual columns with a variety of techniques:

for r in result:
    print(r[0]) # Access by positional index
    print(r['my_column']) # Access by column name as a string
    r_dict = dict(r.items()) # convert to dict keyed by column names

Personally, I prefer to convert the results into namedtuples:

from collections import namedtuple

Record = namedtuple('Record', result.keys())
records = [Record(*r) for r in result.fetchall()]
for r in records:
    print(r.my_column)
    print(r)

If you’re not using the Flask-SQLAlchemy extension, you can still easily use a session:

import sqlalchemy
from sqlalchemy.orm import sessionmaker, scoped_session

engine = sqlalchemy.create_engine('my connection string')
Session = scoped_session(sessionmaker(bind=engine))

s = Session()
result = s.execute('SELECT * FROM my_table WHERE my_column = :val', {'val': 5})

回答 2

docs:SQL表达式语言教程-使用文本

例:

from sqlalchemy.sql import text

connection = engine.connect()

# recommended
cmd = 'select * from Employees where EmployeeGroup = :group'
employeeGroup = 'Staff'
employees = connection.execute(text(cmd), group = employeeGroup)

# or - wee more difficult to interpret the command
employeeGroup = 'Staff'
employees = connection.execute(
                  text('select * from Employees where EmployeeGroup = :group'), 
                  group = employeeGroup)

# or - notice the requirement to quote 'Staff'
employees = connection.execute(
                  text("select * from Employees where EmployeeGroup = 'Staff'"))


for employee in employees: logger.debug(employee)
# output
(0, 'Tim', 'Gurra', 'Staff', '991-509-9284')
(1, 'Jim', 'Carey', 'Staff', '832-252-1910')
(2, 'Lee', 'Asher', 'Staff', '897-747-1564')
(3, 'Ben', 'Hayes', 'Staff', '584-255-2631')

docs: SQL Expression Language Tutorial – Using Text

example:

from sqlalchemy.sql import text

connection = engine.connect()

# recommended
cmd = 'select * from Employees where EmployeeGroup = :group'
employeeGroup = 'Staff'
employees = connection.execute(text(cmd), group = employeeGroup)

# or - wee more difficult to interpret the command
employeeGroup = 'Staff'
employees = connection.execute(
                  text('select * from Employees where EmployeeGroup = :group'), 
                  group = employeeGroup)

# or - notice the requirement to quote 'Staff'
employees = connection.execute(
                  text("select * from Employees where EmployeeGroup = 'Staff'"))


for employee in employees: logger.debug(employee)
# output
(0, 'Tim', 'Gurra', 'Staff', '991-509-9284')
(1, 'Jim', 'Carey', 'Staff', '832-252-1910')
(2, 'Lee', 'Asher', 'Staff', '897-747-1564')
(3, 'Ben', 'Hayes', 'Staff', '584-255-2631')

回答 3

你可以使用SELECT SQL查询的结果from_statement(),并text()如图所示这里。您不必以这种方式处理元组。作为User具有表名的类的示例,users您可以尝试,

from sqlalchemy.sql import text
.
.
.
user = session.query(User).from_statement(
    text("SELECT * FROM users where name=:name")).\
    params(name='ed').all()

return user

You can get the results of SELECT SQL queries using from_statement() and text() as shown here. You don’t have to deal with tuples this way. As an example for a class User having the table name users you can try,

from sqlalchemy.sql import text

user = session.query(User).from_statement(
    text("""SELECT * FROM users where name=:name""")
).params(name="ed").all()

return user

回答 4

result = db.engine.execute(text("<sql here>"))

<sql here>除非您处于autocommit模式,否则执行但不提交。因此,插入和更新不会反映在数据库中。

要在更改后提交,请执行

result = db.engine.execute(text("<sql here>").execution_options(autocommit=True))
result = db.engine.execute(text("<sql here>"))

executes the <sql here> but doesn’t commit it unless you’re on autocommit mode. So, inserts and updates wouldn’t reflect in the database.

To commit after the changes, do

result = db.engine.execute(text("<sql here>").execution_options(autocommit=True))

回答 5

这是如何从Flask Shell运行SQL查询的简化答案

首先,映射您的模块(如果您的模块/应用程序是principal文件夹中的manage.py,并且您在UNIX操作系统中),请运行:

export FLASK_APP=manage

运行烧瓶壳

flask shell

导入我们需要的::

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app)
from sqlalchemy import text

运行查询:

result = db.engine.execute(text("<sql here>").execution_options(autocommit=True))

这将使用具有应用程序的当前数据库连接。

This is a simplified answer of how to run SQL query from Flask Shell

First, map your module (if your module/app is manage.py in the principal folder and you are in a UNIX Operating system), run:

export FLASK_APP=manage

Run Flask shell

flask shell

Import what we need::

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app)
from sqlalchemy import text

Run your query:

result = db.engine.execute(text("<sql here>").execution_options(autocommit=True))

This use the currently database connection which has the application.


回答 6

您是否尝试过按照文档connection.execute(text( <sql here> ), <bind params here> )所述使用和绑定参数?这可以帮助解决许多参数格式设置和性能问题。也许网关错误是超时?绑定参数往往会使复杂的查询执行得更快。

Have you tried using connection.execute(text( <sql here> ), <bind params here> ) and bind parameters as described in the docs? This can help solve many parameter formatting and performance problems. Maybe the gateway error is a timeout? Bind parameters tend to make complex queries execute substantially faster.


回答 7

如果你想避免的元组,另一种方式是通过调用firstoneall方法:

query = db.engine.execute("SELECT * FROM blogs "
                           "WHERE id = 1 ")

assert query.first().name == "Welcome to my blog"

If you want to avoid tuples, another way is by calling the first, one or all methods:

query = db.engine.execute("SELECT * FROM blogs "
                           "WHERE id = 1 ")

assert query.first().name == "Welcome to my blog"

在SQLAlchemy中使用OR

问题:在SQLAlchemy中使用OR

我看了看文档,似乎无法找出如何在SQLAlchemy中进行OR查询。我只想执行此查询。

SELECT address FROM addressbook WHERE city='boston' AND (lastname='bulger' OR firstname='whitey')

应该是这样的

addr = session.query(AddressBook).filter(City == "boston").filter(????)

I’ve looked through the docs and I cant seem to find out how to do an OR query in SQLAlchemy. I just want to do this query.

SELECT address FROM addressbook WHERE city='boston' AND (lastname='bulger' OR firstname='whitey')

Should be something like

addr = session.query(AddressBook).filter(City == "boston").filter(????)

回答 0

教程

from sqlalchemy import or_
filter(or_(User.name == 'ed', User.name == 'wendy'))

From the tutorial:

from sqlalchemy import or_
filter(or_(User.name == 'ed', User.name == 'wendy'))

回答 1

SQLAlchemy的重载位运算符&|~因此而不是丑陋和难以读取的前缀语法与or_()and_()(像巴斯蒂安的答案),你可以使用这些运营商:

.filter((AddressBook.lastname == 'bulger') | (AddressBook.firstname == 'whitey'))

请注意,由于按位运算符的优先级,括号不是可选的

因此,您的整个查询可能如下所示:

addr = session.query(AddressBook) \
    .filter(AddressBook.city == "boston") \
    .filter((AddressBook.lastname == 'bulger') | (AddressBook.firstname == 'whitey'))

SQLAlchemy overloads the bitwise operators &, | and ~ so instead of the ugly and hard-to-read prefix syntax with or_() and and_() (like in Bastien’s answer) you can use these operators:

.filter((AddressBook.lastname == 'bulger') | (AddressBook.firstname == 'whitey'))

Note that the parentheses are not optional due to the precedence of the bitwise operators.

So your whole query could look like this:

addr = session.query(AddressBook) \
    .filter(AddressBook.city == "boston") \
    .filter((AddressBook.lastname == 'bulger') | (AddressBook.firstname == 'whitey'))

回答 2

or_() 在未知数量的OR查询组件的情况下,该函数很有用。

例如,假设我们正在创建一个带有少量可选过滤器的REST服务,如果任何过滤器返回true,则该服务应返回记录。另一方面,如果未在请求中定义参数,则我们的查询不应更改。没有or_()功能,我们必须执行以下操作:

query = Book.query
if filter.title and filter.author:
    query = query.filter((Book.title.ilike(filter.title))|(Book.author.ilike(filter.author)))
else if filter.title:
    query = query.filter(Book.title.ilike(filter.title))
else if filter.author:
    query = query.filter(Book.author.ilike(filter.author))

通过or_()功能可以将其重写为:

query = Book.query
not_null_filters = []
if filter.title:
    not_null_filters.append(Book.title.ilike(filter.title))
if filter.author:
    not_null_filters.append(Book.author.ilike(filter.author))

if len(not_null_filters) > 0:
    query = query.filter(or_(*not_null_filters))

or_() function can be useful in case of unknown number of OR query components.

For example, let’s assume that we are creating a REST service with few optional filters, that should return record if any of filters return true. On the other side, if parameter was not defined in a request, our query shouldn’t change. Without or_() function we must do something like this:

query = Book.query
if filter.title and filter.author:
    query = query.filter((Book.title.ilike(filter.title))|(Book.author.ilike(filter.author)))
else if filter.title:
    query = query.filter(Book.title.ilike(filter.title))
else if filter.author:
    query = query.filter(Book.author.ilike(filter.author))

With or_() function it can be rewritten to:

query = Book.query
not_null_filters = []
if filter.title:
    not_null_filters.append(Book.title.ilike(filter.title))
if filter.author:
    not_null_filters.append(Book.author.ilike(filter.author))

if len(not_null_filters) > 0:
    query = query.filter(or_(*not_null_filters))

回答 3

这真的很有帮助。这是任何给定表的实现:

def sql_replace(self, tableobject, dictargs):

    #missing check of table object is valid
    primarykeys = [key.name for key in inspect(tableobject).primary_key]

    filterargs = []
    for primkeys in primarykeys:
        if dictargs[primkeys] is not None:
            filterargs.append(getattr(db.RT_eqmtvsdata, primkeys) == dictargs[primkeys])
        else:
            return

    query = select([db.RT_eqmtvsdata]).where(and_(*filterargs))

    if self.r_ExecuteAndErrorChk2(query)[primarykeys[0]] is not None:
        # update
        filter = and_(*filterargs)
        query = tableobject.__table__.update().values(dictargs).where(filter)
        return self.w_ExecuteAndErrorChk2(query)

    else:
        query = tableobject.__table__.insert().values(dictargs)
        return self.w_ExecuteAndErrorChk2(query)

# example usage
inrow = {'eqmtvs_id': eqmtvsid, 'datetime': dtime, 'param_id': paramid}

self.sql_replace(tableobject=db.RT_eqmtvsdata, dictargs=inrow)

This has been really helpful. Here is my implementation for any given table:

def sql_replace(self, tableobject, dictargs):

    #missing check of table object is valid
    primarykeys = [key.name for key in inspect(tableobject).primary_key]

    filterargs = []
    for primkeys in primarykeys:
        if dictargs[primkeys] is not None:
            filterargs.append(getattr(db.RT_eqmtvsdata, primkeys) == dictargs[primkeys])
        else:
            return

    query = select([db.RT_eqmtvsdata]).where(and_(*filterargs))

    if self.r_ExecuteAndErrorChk2(query)[primarykeys[0]] is not None:
        # update
        filter = and_(*filterargs)
        query = tableobject.__table__.update().values(dictargs).where(filter)
        return self.w_ExecuteAndErrorChk2(query)

    else:
        query = tableobject.__table__.insert().values(dictargs)
        return self.w_ExecuteAndErrorChk2(query)

# example usage
inrow = {'eqmtvs_id': eqmtvsid, 'datetime': dtime, 'param_id': paramid}

self.sql_replace(tableobject=db.RT_eqmtvsdata, dictargs=inrow)

如何将SqlAlchemy结果序列化为JSON?

问题:如何将SqlAlchemy结果序列化为JSON?

Django有一些很好的自动ORM模型从数据库返回到JSON格式的自动序列化。

如何将SQLAlchemy查询结果序列化为JSON格式?

我试过了,jsonpickle.encode但是它编码查询对象本身。我试过了json.dumps(items)但是回来了

TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable

将SQLAlchemy ORM对象序列化为JSON / XML真的很难吗?没有默认的序列化程序吗?如今,序列化ORM查询结果是非常常见的任务。

我需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示形式。

javascript datagird(JQGrid http://www.trirand.com/blog/)中需要使用JSON / XML格式的SQLAlchemy对象查询结果

Django has some good automatic serialization of ORM models returned from DB to JSON format.

How to serialize SQLAlchemy query result to JSON format?

I tried jsonpickle.encode but it encodes query object itself. I tried json.dumps(items) but it returns

TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable

Is it really so hard to serialize SQLAlchemy ORM objects to JSON /XML? Isn’t there any default serializer for it? It’s very common task to serialize ORM query results nowadays.

What I need is just to return JSON or XML data representation of SQLAlchemy query result.

SQLAlchemy objects query result in JSON/XML format is needed to be used in javascript datagird (JQGrid http://www.trirand.com/blog/)


回答 0

平面实施

您可以使用如下形式:

from sqlalchemy.ext.declarative import DeclarativeMeta

class AlchemyEncoder(json.JSONEncoder):

    def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            # an SQLAlchemy class
            fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                data = obj.__getattribute__(field)
                try:
                    json.dumps(data) # this will fail on non-encodable values, like other classes
                    fields[field] = data
                except TypeError:
                    fields[field] = None
            # a json-encodable dict
            return fields

        return json.JSONEncoder.default(self, obj)

然后使用以下命令转换为JSON:

c = YourAlchemyClass()
print json.dumps(c, cls=AlchemyEncoder)

它将忽略不可编码的字段(将它们设置为“无”)。

它不会自动扩展关系(因为这可能导致自我引用,并永远循环)。

递归的非循环实现

但是,如果您希望永远循环,则可以使用:

from sqlalchemy.ext.declarative import DeclarativeMeta

def new_alchemy_encoder():
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if obj in _visited_objs:
                    return None
                _visited_objs.append(obj)

                # an SQLAlchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    fields[field] = obj.__getattribute__(field)
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

然后使用以下代码编码对象:

print json.dumps(e, cls=new_alchemy_encoder(), check_circular=False)

这将对所有子项,所有子项以及所有子项进行编码。基本上,可能对整个数据库进行编码。当它到达之前已编码的内容时,会将其编码为“无”。

递归(可能是循环的)选择性实现

另一种可能更好的选择是能够指定要扩展的字段:

def new_alchemy_encoder(revisit_self = False, fields_to_expand = []):
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if revisit_self:
                    if obj in _visited_objs:
                        return None
                    _visited_objs.append(obj)

                # go through each field in this SQLalchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    val = obj.__getattribute__(field)

                    # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                    if isinstance(val.__class__, DeclarativeMeta) or (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                        # unless we're expanding this field, stop here
                        if field not in fields_to_expand:
                            # not expanding this field: set it to None and continue
                            fields[field] = None
                            continue

                    fields[field] = val
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

您现在可以通过以下方式调用它:

print json.dumps(e, cls=new_alchemy_encoder(False, ['parents']), check_circular=False)

例如,仅扩展名为“父母”的SQLAlchemy字段。

A flat implementation

You could use something like this:

from sqlalchemy.ext.declarative import DeclarativeMeta

class AlchemyEncoder(json.JSONEncoder):

    def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            # an SQLAlchemy class
            fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                data = obj.__getattribute__(field)
                try:
                    json.dumps(data) # this will fail on non-encodable values, like other classes
                    fields[field] = data
                except TypeError:
                    fields[field] = None
            # a json-encodable dict
            return fields

        return json.JSONEncoder.default(self, obj)

and then convert to JSON using:

c = YourAlchemyClass()
print json.dumps(c, cls=AlchemyEncoder)

It will ignore fields that are not encodable (set them to ‘None’).

It doesn’t auto-expand relations (since this could lead to self-references, and loop forever).

A recursive, non-circular implementation

If, however, you’d rather loop forever, you could use:

from sqlalchemy.ext.declarative import DeclarativeMeta

def new_alchemy_encoder():
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if obj in _visited_objs:
                    return None
                _visited_objs.append(obj)

                # an SQLAlchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    fields[field] = obj.__getattribute__(field)
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

And then encode objects using:

print json.dumps(e, cls=new_alchemy_encoder(), check_circular=False)

This would encode all children, and all their children, and all their children… Potentially encode your entire database, basically. When it reaches something its encoded before, it will encode it as ‘None’.

A recursive, possibly-circular, selective implementation

Another alternative, probably better, is to be able to specify the fields you want to expand:

def new_alchemy_encoder(revisit_self = False, fields_to_expand = []):
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if revisit_self:
                    if obj in _visited_objs:
                        return None
                    _visited_objs.append(obj)

                # go through each field in this SQLalchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    val = obj.__getattribute__(field)

                    # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                    if isinstance(val.__class__, DeclarativeMeta) or (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                        # unless we're expanding this field, stop here
                        if field not in fields_to_expand:
                            # not expanding this field: set it to None and continue
                            fields[field] = None
                            continue

                    fields[field] = val
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

You can now call it with:

print json.dumps(e, cls=new_alchemy_encoder(False, ['parents']), check_circular=False)

To only expand SQLAlchemy fields called ‘parents’, for example.


回答 1

您可以将对象输出为字典:

class User:
   def as_dict(self):
       return {c.name: getattr(self, c.name) for c in self.__table__.columns}

然后你用 User.as_dict()序列化对象。

将sqlalchemy行对象转换为python dict中所述

You could just output your object as a dictionary:

class User:
   def as_dict(self):
       return {c.name: getattr(self, c.name) for c in self.__table__.columns}

And then you use User.as_dict() to serialize your object.

As explained in Convert sqlalchemy row object to python dict


回答 2

您可以将RowProxy转换为这样的字典:

 d = dict(row.items())

然后将其序列化为JSON(您将必须为datetime值之类的东西指定编码器),如果您只想要一个记录(而不是相关记录的完整层次结构),这并不难。

json.dumps([(dict(row.items())) for row in rs])

You can convert a RowProxy to a dict like this:

 d = dict(row.items())

Then serialize that to JSON ( you will have to specify an encoder for things like datetime values ) It’s not that hard if you just want one record ( and not a full hierarchy of related records ).

json.dumps([(dict(row.items())) for row in rs])

回答 3

我建议使用棉花糖。它允许您创建序列化器来表示模型实例,并支持关系和嵌套对象。

这是他们文档中的截断示例。采取ORM模型Author

class Author(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    first = db.Column(db.String(80))
    last = db.Column(db.String(80))

该类的棉花糖架构如下所示:

class AuthorSchema(Schema):
    id = fields.Int(dump_only=True)
    first = fields.Str()
    last = fields.Str()
    formatted_name = fields.Method("format_name", dump_only=True)

    def format_name(self, author):
        return "{}, {}".format(author.last, author.first)

…并像这样使用:

author_schema = AuthorSchema()
author_schema.dump(Author.query.first())

…将产生如下输出:

{
        "first": "Tim",
        "formatted_name": "Peters, Tim",
        "id": 1,
        "last": "Peters"
}

看看他们完整的Flask-SQLAlchemy示例

一个名为的库marshmallow-sqlalchemy专门集成了SQLAlchemy和棉花糖。在该库中,上述Author模型的架构如下所示:

class AuthorSchema(ModelSchema):
    class Meta:
        model = Author

集成允许从SQLAlchemy Column类型推断字段类型。

棉花糖-sqlalchemy在这里。

I recommend using marshmallow. It allows you to create serializers to represent your model instances with support to relations and nested objects.

Here is a truncated example from their docs. Take the ORM model, Author:

class Author(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    first = db.Column(db.String(80))
    last = db.Column(db.String(80))

A marshmallow schema for that class is constructed like this:

class AuthorSchema(Schema):
    id = fields.Int(dump_only=True)
    first = fields.Str()
    last = fields.Str()
    formatted_name = fields.Method("format_name", dump_only=True)

    def format_name(self, author):
        return "{}, {}".format(author.last, author.first)

…and used like this:

author_schema = AuthorSchema()
author_schema.dump(Author.query.first())

…would produce an output like this:

{
        "first": "Tim",
        "formatted_name": "Peters, Tim",
        "id": 1,
        "last": "Peters"
}

Have a look at their full Flask-SQLAlchemy Example.

A library called marshmallow-sqlalchemy specifically integrates SQLAlchemy and marshmallow. In that library, the schema for the Author model described above looks like this:

class AuthorSchema(ModelSchema):
    class Meta:
        model = Author

The integration allows the field types to be inferred from the SQLAlchemy Column types.

marshmallow-sqlalchemy here.


回答 4

Python的3.7+和瓶1.1+可以使用内置的数据类

from dataclasses import dataclass
from datetime import datetime
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy


app = Flask(__name__)
db = SQLAlchemy(app)


@dataclass
class User(db.Model):
  id: int
  email: str

  id = db.Column(db.Integer, primary_key=True, auto_increment=True)
  email = db.Column(db.String(200), unique=True)


@app.route('/users/')
def users():
  users = User.query.all()
  return jsonify(users)  


if __name__ == "__main__":
  users = User(email="user1@gmail.com"), User(email="user2@gmail.com")
  db.create_all()
  db.session.add_all(users)
  db.session.commit()
  app.run()

/users/现在,该路线将返回用户列表。

[
  {"email": "user1@gmail.com", "id": 1},
  {"email": "user2@gmail.com", "id": 2}
]

自动序列化相关模型

@dataclass
class Account(db.Model):
  id: int
  users: User

  id = db.Column(db.Integer)
  users = db.relationship(User)  # User model would need a db.ForeignKey field

来自的回应jsonify(account)就是这样。

{  
   "id":1,
   "users":[  
      {  
         "email":"user1@gmail.com",
         "id":1
      },
      {  
         "email":"user2@gmail.com",
         "id":2
      }
   ]
}

覆盖默认的JSON编码器

from flask.json import JSONEncoder


class CustomJSONEncoder(JSONEncoder):
  "Add support for serializing timedeltas"

  def default(o):
    if type(o) == datetime.timedelta:
      return str(o)
    elif type(o) == datetime.datetime:
      return o.isoformat()
    else:
      return super().default(o)

app.json_encoder = CustomJSONEncoder      

Python 3.7+ and Flask 1.1+ can use the built-in dataclasses package

from dataclasses import dataclass
from datetime import datetime
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy


app = Flask(__name__)
db = SQLAlchemy(app)


@dataclass
class User(db.Model):
  id: int
  email: str

  id = db.Column(db.Integer, primary_key=True, auto_increment=True)
  email = db.Column(db.String(200), unique=True)


@app.route('/users/')
def users():
  users = User.query.all()
  return jsonify(users)  


if __name__ == "__main__":
  users = User(email="user1@gmail.com"), User(email="user2@gmail.com")
  db.create_all()
  db.session.add_all(users)
  db.session.commit()
  app.run()

The /users/ route will now return a list of users.

[
  {"email": "user1@gmail.com", "id": 1},
  {"email": "user2@gmail.com", "id": 2}
]

Auto-serialize related models

@dataclass
class Account(db.Model):
  id: int
  users: User

  id = db.Column(db.Integer)
  users = db.relationship(User)  # User model would need a db.ForeignKey field

The response from jsonify(account) would be this.

{  
   "id":1,
   "users":[  
      {  
         "email":"user1@gmail.com",
         "id":1
      },
      {  
         "email":"user2@gmail.com",
         "id":2
      }
   ]
}

Overwrite the default JSON Encoder

from flask.json import JSONEncoder


class CustomJSONEncoder(JSONEncoder):
  "Add support for serializing timedeltas"

  def default(o):
    if type(o) == datetime.timedelta:
      return str(o)
    elif type(o) == datetime.datetime:
      return o.isoformat()
    else:
      return super().default(o)

app.json_encoder = CustomJSONEncoder      

回答 5

烧瓶JsonTools包装具有实现JsonSerializableBase为您的模型提供 Base类的实现。

用法:

from sqlalchemy.ext.declarative import declarative_base
from flask.ext.jsontools import JsonSerializableBase

Base = declarative_base(cls=(JsonSerializableBase,))

class User(Base):
    #...

现在 User模型可以神奇地序列化了。

如果您的框架不是Flask,则只需获取代码

Flask-JsonTools package has an implementation of JsonSerializableBase Base class for your models.

Usage:

from sqlalchemy.ext.declarative import declarative_base
from flask.ext.jsontools import JsonSerializableBase

Base = declarative_base(cls=(JsonSerializableBase,))

class User(Base):
    #...

Now the User model is magically serializable.

If your framework is not Flask, you can just grab the code


回答 6

出于安全原因,您永远不要返回模型的所有字段。我喜欢有选择地选择它们。

现在编码瓶的JSON支持UUID,日期时间和关系数据(以及添加queryquery_class对flask_sqlalchemy db.Model类)。我已更新编码器,如下所示:

app / json_encoder.py

    from sqlalchemy.ext.declarative import DeclarativeMeta
    from flask import json


    class AlchemyEncoder(json.JSONEncoder):
        def default(self, o):
            if isinstance(o.__class__, DeclarativeMeta):
                data = {}
                fields = o.__json__() if hasattr(o, '__json__') else dir(o)
                for field in [f for f in fields if not f.startswith('_') and f not in ['metadata', 'query', 'query_class']]:
                    value = o.__getattribute__(field)
                    try:
                        json.dumps(value)
                        data[field] = value
                    except TypeError:
                        data[field] = None
                return data
            return json.JSONEncoder.default(self, o)

app/__init__.py

# json encoding
from app.json_encoder import AlchemyEncoder
app.json_encoder = AlchemyEncoder

这样,我可以选择添加一个__json__属性,该属性返回我想编码的字段列表:

app/models.py

class Queue(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    song_id = db.Column(db.Integer, db.ForeignKey('song.id'), unique=True, nullable=False)
    song = db.relationship('Song', lazy='joined')
    type = db.Column(db.String(20), server_default=u'audio/mpeg')
    src = db.Column(db.String(255), nullable=False)
    created_at = db.Column(db.DateTime, server_default=db.func.now())
    updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())

    def __init__(self, song):
        self.song = song
        self.src = song.full_path

    def __json__(self):
        return ['song', 'src', 'type', 'created_at']

我将@jsonapi添加到视图中,返回结果列表,然后输出如下:

[

{

    "created_at": "Thu, 23 Jul 2015 11:36:53 GMT",
    "song": 

        {
            "full_path": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
            "id": 2,
            "path_name": "Audioslave/Audioslave [2002]/1 Cochise.mp3"
        },
    "src": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
    "type": "audio/mpeg"
}

]

For security reasons you should never return all the model’s fields. I prefer to selectively choose them.

Flask’s json encoding now supports UUID, datetime and relationships (and added query and query_class for flask_sqlalchemy db.Model class). I’ve updated the encoder as follows:

app/json_encoder.py

    from sqlalchemy.ext.declarative import DeclarativeMeta
    from flask import json


    class AlchemyEncoder(json.JSONEncoder):
        def default(self, o):
            if isinstance(o.__class__, DeclarativeMeta):
                data = {}
                fields = o.__json__() if hasattr(o, '__json__') else dir(o)
                for field in [f for f in fields if not f.startswith('_') and f not in ['metadata', 'query', 'query_class']]:
                    value = o.__getattribute__(field)
                    try:
                        json.dumps(value)
                        data[field] = value
                    except TypeError:
                        data[field] = None
                return data
            return json.JSONEncoder.default(self, o)

app/__init__.py

# json encoding
from app.json_encoder import AlchemyEncoder
app.json_encoder = AlchemyEncoder

With this I can optionally add a __json__ property that returns the list of fields I wish to encode:

app/models.py

class Queue(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    song_id = db.Column(db.Integer, db.ForeignKey('song.id'), unique=True, nullable=False)
    song = db.relationship('Song', lazy='joined')
    type = db.Column(db.String(20), server_default=u'audio/mpeg')
    src = db.Column(db.String(255), nullable=False)
    created_at = db.Column(db.DateTime, server_default=db.func.now())
    updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())

    def __init__(self, song):
        self.song = song
        self.src = song.full_path

    def __json__(self):
        return ['song', 'src', 'type', 'created_at']

I add @jsonapi to my view, return the resultlist and then my output is as follows:

[

{

    "created_at": "Thu, 23 Jul 2015 11:36:53 GMT",
    "song": 

        {
            "full_path": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
            "id": 2,
            "path_name": "Audioslave/Audioslave [2002]/1 Cochise.mp3"
        },
    "src": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
    "type": "audio/mpeg"
}

]

回答 7

您可以这样使用SqlAlchemy的自省:

mysql = SQLAlchemy()
from sqlalchemy import inspect

class Contacts(mysql.Model):  
    __tablename__ = 'CONTACTS'
    id = mysql.Column(mysql.Integer, primary_key=True)
    first_name = mysql.Column(mysql.String(128), nullable=False)
    last_name = mysql.Column(mysql.String(128), nullable=False)
    phone = mysql.Column(mysql.String(128), nullable=False)
    email = mysql.Column(mysql.String(128), nullable=False)
    street = mysql.Column(mysql.String(128), nullable=False)
    zip_code = mysql.Column(mysql.String(128), nullable=False)
    city = mysql.Column(mysql.String(128), nullable=False)
    def toDict(self):
        return { c.key: getattr(self, c.key) for c in inspect(self).mapper.column_attrs }

@app.route('/contacts',methods=['GET'])
def getContacts():
    contacts = Contacts.query.all()
    contactsArr = []
    for contact in contacts:
        contactsArr.append(contact.toDict()) 
    return jsonify(contactsArr)

@app.route('/contacts/<int:id>',methods=['GET'])
def getContact(id):
    contact = Contacts.query.get(id)
    return jsonify(contact.toDict())

从这里的答案中获得启发: 将sqlalchemy行对象转换为python dict

You can use introspection of SqlAlchemy as this :

mysql = SQLAlchemy()
from sqlalchemy import inspect

class Contacts(mysql.Model):  
    __tablename__ = 'CONTACTS'
    id = mysql.Column(mysql.Integer, primary_key=True)
    first_name = mysql.Column(mysql.String(128), nullable=False)
    last_name = mysql.Column(mysql.String(128), nullable=False)
    phone = mysql.Column(mysql.String(128), nullable=False)
    email = mysql.Column(mysql.String(128), nullable=False)
    street = mysql.Column(mysql.String(128), nullable=False)
    zip_code = mysql.Column(mysql.String(128), nullable=False)
    city = mysql.Column(mysql.String(128), nullable=False)
    def toDict(self):
        return { c.key: getattr(self, c.key) for c in inspect(self).mapper.column_attrs }

@app.route('/contacts',methods=['GET'])
def getContacts():
    contacts = Contacts.query.all()
    contactsArr = []
    for contact in contacts:
        contactsArr.append(contact.toDict()) 
    return jsonify(contactsArr)

@app.route('/contacts/<int:id>',methods=['GET'])
def getContact(id):
    contact = Contacts.query.get(id)
    return jsonify(contact.toDict())

Get inspired from an answer here : Convert sqlalchemy row object to python dict


回答 8

更详细的解释。在模型中,添加:

def as_dict(self):
       return {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

str()是针对python 3的,因此如果使用python 2则使用unicode()。它应该有助于反序列化日期。如果不处理这些问题,则可以将其删除。

您现在可以像这样查询数据库

some_result = User.query.filter_by(id=current_user.id).first().as_dict()

First()需要避免奇怪的错误。as_dict()现在将反序列化结果。反序列化后,可以将其转换为json

jsonify(some_result)

A more detailed explanation. In your model, add:

def as_dict(self):
       return {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

The str() is for python 3 so if using python 2 use unicode(). It should help deserialize dates. You can remove it if not dealing with those.

You can now query the database like this

some_result = User.query.filter_by(id=current_user.id).first().as_dict()

First() is needed to avoid weird errors. as_dict() will now deserialize the result. After deserialization, it is ready to be turned to json

jsonify(some_result)

回答 9

它不是那么简单。我写了一些代码来做到这一点。我仍在努力,它使用MochiKit框架。它基本上使用代理和已注册的JSON转换器在Python和Javascript之间转换复合对象。

数据库对象的浏览器端是db.js 它需要在基本的Python代理源的proxy.js

在Python方面,有基本的代理模块。最后是webserver.py中的SqlAlchemy对象编码器。它还取决于在models.py文件中找到的元数据提取器。

It is not so straighforward. I wrote some code to do this. I’m still working on it, and it uses the MochiKit framework. It basically translates compound objects between Python and Javascript using a proxy and registered JSON converters.

Browser side for database objects is db.js It needs the basic Python proxy source in proxy.js.

On the Python side there is the base proxy module. Then finally the SqlAlchemy object encoder in webserver.py. It also depends on metadata extractors found in the models.py file.


回答 10

虽然最初的问题可以回溯一会儿,但这里的答案(以及我的经验)表明,这是一个不平凡的问题,它具有许多不同的方法,并具有不同的权衡取舍。

这就是为什么我构建SQLAthanor库,库扩展了SQLAlchemy的声明性ORM,并提供了您可能想看的可配置序列化/反序列化支持。

该库支持:

  • Python 2.7、3.4、3.5和3.6。
  • SQLAlchemy 0.9版或更高版本
  • 到JSON,CSV,YAML和Python的序列化/反序列化 dict
  • 列/属性,关系,混合属性和关联代理的序列化/反序列化
  • 为特定格式和列/关系/属性启用和禁用序列化(例如,您要支持入站 password值,但不要包括值)
  • 序列化前和序列化后的值处理(用于验证或强制类型转换)
  • 非常简单的语法,既Python风格又与SQLAlchemy自己的方法无缝地保持一致

您可以在此处查看(我希望!)全面的文档: https //sqlathanor.readthedocs.io/en/latest

希望这可以帮助!

While the original question goes back awhile, the number of answers here (and my own experiences) suggest it’s a non-trivial question with a lot of different approaches of varying complexity with different trade-offs.

That’s why I built the SQLAthanor library that extends SQLAlchemy’s declarative ORM with configurable serialization/de-serialization support that you might want to take a look at.

The library supports:

  • Python 2.7, 3.4, 3.5, and 3.6.
  • SQLAlchemy versions 0.9 and higher
  • serialization/de-serialization to/from JSON, CSV, YAML, and Python dict
  • serialization/de-serialization of columns/attributes, relationships, hybrid properties, and association proxies
  • enabling and disabling of serialization for particular formats and columns/relationships/attributes (e.g. you want to support an inbound password value, but never include an outbound one)
  • pre-serialization and post-deserialization value processing (for validation or type coercion)
  • a pretty straightforward syntax that is both Pythonic and seamlessly consistent with SQLAlchemy’s own approach

You can check out the (I hope!) comprehensive docs here: https://sqlathanor.readthedocs.io/en/latest

Hope this helps!


回答 11

自定义序列化和反序列化。

“ from_json”(类方法)基于json数据构建Model对象。

“反序列化”只能在实例上调用,并将json中的所有数据合并到Model实例中。

“序列化” -递归序列化

需要__write_only__属性来定义只写属性(例如“ password_hash”)。

class Serializable(object):
    __exclude__ = ('id',)
    __include__ = ()
    __write_only__ = ()

    @classmethod
    def from_json(cls, json, selfObj=None):
        if selfObj is None:
            self = cls()
        else:
            self = selfObj
        exclude = (cls.__exclude__ or ()) + Serializable.__exclude__
        include = cls.__include__ or ()
        if json:
            for prop, value in json.iteritems():
                # ignore all non user data, e.g. only
                if (not (prop in exclude) | (prop in include)) and isinstance(
                        getattr(cls, prop, None), QueryableAttribute):
                    setattr(self, prop, value)
        return self

    def deserialize(self, json):
        if not json:
            return None
        return self.__class__.from_json(json, selfObj=self)

    @classmethod
    def serialize_list(cls, object_list=[]):
        output = []
        for li in object_list:
            if isinstance(li, Serializable):
                output.append(li.serialize())
            else:
                output.append(li)
        return output

    def serialize(self, **kwargs):

        # init write only props
        if len(getattr(self.__class__, '__write_only__', ())) == 0:
            self.__class__.__write_only__ = ()
        dictionary = {}
        expand = kwargs.get('expand', ()) or ()
        prop = 'props'
        if expand:
            # expand all the fields
            for key in expand:
                getattr(self, key)
        iterable = self.__dict__.items()
        is_custom_property_set = False
        # include only properties passed as parameter
        if (prop in kwargs) and (kwargs.get(prop, None) is not None):
            is_custom_property_set = True
            iterable = kwargs.get(prop, None)
        # loop trough all accessible properties
        for key in iterable:
            accessor = key
            if isinstance(key, tuple):
                accessor = key[0]
            if not (accessor in self.__class__.__write_only__) and not accessor.startswith('_'):
                # force select from db to be able get relationships
                if is_custom_property_set:
                    getattr(self, accessor, None)
                if isinstance(self.__dict__.get(accessor), list):
                    dictionary[accessor] = self.__class__.serialize_list(object_list=self.__dict__.get(accessor))
                # check if those properties are read only
                elif isinstance(self.__dict__.get(accessor), Serializable):
                    dictionary[accessor] = self.__dict__.get(accessor).serialize()
                else:
                    dictionary[accessor] = self.__dict__.get(accessor)
        return dictionary

Custom serialization and deserialization.

“from_json” (class method) builds a Model object based on json data.

“deserialize” could be called only on instance, and merge all data from json into Model instance.

“serialize” – recursive serialization

__write_only__ property is needed to define write only properties (“password_hash” for example).

class Serializable(object):
    __exclude__ = ('id',)
    __include__ = ()
    __write_only__ = ()

    @classmethod
    def from_json(cls, json, selfObj=None):
        if selfObj is None:
            self = cls()
        else:
            self = selfObj
        exclude = (cls.__exclude__ or ()) + Serializable.__exclude__
        include = cls.__include__ or ()
        if json:
            for prop, value in json.iteritems():
                # ignore all non user data, e.g. only
                if (not (prop in exclude) | (prop in include)) and isinstance(
                        getattr(cls, prop, None), QueryableAttribute):
                    setattr(self, prop, value)
        return self

    def deserialize(self, json):
        if not json:
            return None
        return self.__class__.from_json(json, selfObj=self)

    @classmethod
    def serialize_list(cls, object_list=[]):
        output = []
        for li in object_list:
            if isinstance(li, Serializable):
                output.append(li.serialize())
            else:
                output.append(li)
        return output

    def serialize(self, **kwargs):

        # init write only props
        if len(getattr(self.__class__, '__write_only__', ())) == 0:
            self.__class__.__write_only__ = ()
        dictionary = {}
        expand = kwargs.get('expand', ()) or ()
        prop = 'props'
        if expand:
            # expand all the fields
            for key in expand:
                getattr(self, key)
        iterable = self.__dict__.items()
        is_custom_property_set = False
        # include only properties passed as parameter
        if (prop in kwargs) and (kwargs.get(prop, None) is not None):
            is_custom_property_set = True
            iterable = kwargs.get(prop, None)
        # loop trough all accessible properties
        for key in iterable:
            accessor = key
            if isinstance(key, tuple):
                accessor = key[0]
            if not (accessor in self.__class__.__write_only__) and not accessor.startswith('_'):
                # force select from db to be able get relationships
                if is_custom_property_set:
                    getattr(self, accessor, None)
                if isinstance(self.__dict__.get(accessor), list):
                    dictionary[accessor] = self.__class__.serialize_list(object_list=self.__dict__.get(accessor))
                # check if those properties are read only
                elif isinstance(self.__dict__.get(accessor), Serializable):
                    dictionary[accessor] = self.__dict__.get(accessor).serialize()
                else:
                    dictionary[accessor] = self.__dict__.get(accessor)
        return dictionary

回答 12

这是一个解决方案,可让您选择想要包含在输出中的关系。注意:这是一个完整的重写,将dict / str作为arg而不是列表。修复一些东西。

def deep_dict(self, relations={}):
    """Output a dict of an SA object recursing as deep as you want.

    Takes one argument, relations which is a dictionary of relations we'd
    like to pull out. The relations dict items can be a single relation
    name or deeper relation names connected by sub dicts

    Example:
        Say we have a Person object with a family relationship
            person.deep_dict(relations={'family':None})
        Say the family object has homes as a relation then we can do
            person.deep_dict(relations={'family':{'homes':None}})
            OR
            person.deep_dict(relations={'family':'homes'})
        Say homes has a relation like rooms you can do
            person.deep_dict(relations={'family':{'homes':'rooms'}})
            and so on...
    """
    mydict =  dict((c, str(a)) for c, a in
                    self.__dict__.items() if c != '_sa_instance_state')
    if not relations:
        # just return ourselves
        return mydict

    # otherwise we need to go deeper
    if not isinstance(relations, dict) and not isinstance(relations, str):
        raise Exception("relations should be a dict, it is of type {}".format(type(relations)))

    # got here so check and handle if we were passed a dict
    if isinstance(relations, dict):
        # we were passed deeper info
        for left, right in relations.items():
            myrel = getattr(self, left)
            if isinstance(myrel, list):
                mydict[left] = [rel.deep_dict(relations=right) for rel in myrel]
            else:
                mydict[left] = myrel.deep_dict(relations=right)
    # if we get here check and handle if we were passed a string
    elif isinstance(relations, str):
        # passed a single item
        myrel = getattr(self, relations)
        left = relations
        if isinstance(myrel, list):
            mydict[left] = [rel.deep_dict(relations=None)
                                 for rel in myrel]
        else:
            mydict[left] = myrel.deep_dict(relations=None)

    return mydict

因此,例如使用人员/家庭/房屋/房间的示例…将其转换为json,您所需要做的就是

json.dumps(person.deep_dict(relations={'family':{'homes':'rooms'}}))

Here is a solution that lets you select the relations you want to include in your output as deep as you would like to go. NOTE: This is a complete re-write taking a dict/str as an arg rather than a list. fixes some stuff..

def deep_dict(self, relations={}):
    """Output a dict of an SA object recursing as deep as you want.

    Takes one argument, relations which is a dictionary of relations we'd
    like to pull out. The relations dict items can be a single relation
    name or deeper relation names connected by sub dicts

    Example:
        Say we have a Person object with a family relationship
            person.deep_dict(relations={'family':None})
        Say the family object has homes as a relation then we can do
            person.deep_dict(relations={'family':{'homes':None}})
            OR
            person.deep_dict(relations={'family':'homes'})
        Say homes has a relation like rooms you can do
            person.deep_dict(relations={'family':{'homes':'rooms'}})
            and so on...
    """
    mydict =  dict((c, str(a)) for c, a in
                    self.__dict__.items() if c != '_sa_instance_state')
    if not relations:
        # just return ourselves
        return mydict

    # otherwise we need to go deeper
    if not isinstance(relations, dict) and not isinstance(relations, str):
        raise Exception("relations should be a dict, it is of type {}".format(type(relations)))

    # got here so check and handle if we were passed a dict
    if isinstance(relations, dict):
        # we were passed deeper info
        for left, right in relations.items():
            myrel = getattr(self, left)
            if isinstance(myrel, list):
                mydict[left] = [rel.deep_dict(relations=right) for rel in myrel]
            else:
                mydict[left] = myrel.deep_dict(relations=right)
    # if we get here check and handle if we were passed a string
    elif isinstance(relations, str):
        # passed a single item
        myrel = getattr(self, relations)
        left = relations
        if isinstance(myrel, list):
            mydict[left] = [rel.deep_dict(relations=None)
                                 for rel in myrel]
        else:
            mydict[left] = myrel.deep_dict(relations=None)

    return mydict

so for an example using person/family/homes/rooms… turning it into json all you need is

json.dumps(person.deep_dict(relations={'family':{'homes':'rooms'}}))

回答 13

def alc2json(row):
    return dict([(col, str(getattr(row,col))) for col in row.__table__.columns.keys()])

我以为我会和这个打一点代码高尔夫。

仅供参考:我正在使用automap_base因为我们有根据业务需求单独设计的架构。我今天才刚开始使用SQLAlchemy,但是文档指出automap_base是对clarativeative_base的扩展,这似乎是SQLAlchemy ORM中的典型范例,因此我认为这应该可行。

按照Tjorriemorrie的解决方案,使用后跟的外键并不太花哨,但是它只是将列与值匹配,并通过str()-列的值来处理Python类型。我们的值包括Python datetime.time和decimal.Decimal类类型的结果,因此可以完成工作。

希望这对任何路人都有帮助!

def alc2json(row):
    return dict([(col, str(getattr(row,col))) for col in row.__table__.columns.keys()])

I thought I’d play a little code golf with this one.

FYI: I am using automap_base since we have a separately designed schema according to business requirements. I just started using SQLAlchemy today but the documentation states that automap_base is an extension to declarative_base which seems to be the typical paradigm in the SQLAlchemy ORM so I believe this should work.

It does not get fancy with following foreign keys per Tjorriemorrie‘s solution, but it simply matches columns to values and handles Python types by str()-ing the column values. Our values consist Python datetime.time and decimal.Decimal class type results so it gets the job done.

Hope this helps any passers-by!


回答 14

我知道这是一个比较老的帖子。我接受了@SashaB提供的解决方案,并根据需要进行了修改。

我添加了以下内容:

  1. 字段忽略列表:序列化时要忽略的字段列表
  2. 字段替换列表:字典,其中包含要在序列化时用值替换的字段名称。
  3. 删除了方法并使BaseQuery序列化

我的代码如下:

def alchemy_json_encoder(revisit_self = False, fields_to_expand = [], fields_to_ignore = [], fields_to_replace = {}):
   """
   Serialize SQLAlchemy result into JSon
   :param revisit_self: True / False
   :param fields_to_expand: Fields which are to be expanded for including their children and all
   :param fields_to_ignore: Fields to be ignored while encoding
   :param fields_to_replace: Field keys to be replaced by values assigned in dictionary
   :return: Json serialized SQLAlchemy object
   """
   _visited_objs = []
   class AlchemyEncoder(json.JSONEncoder):
      def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            # don't re-visit self
            if revisit_self:
                if obj in _visited_objs:
                    return None
                _visited_objs.append(obj)

            # go through each field in this SQLalchemy class
            fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata' and x not in fields_to_ignore]:
                val = obj.__getattribute__(field)
                # is this field method defination, or an SQLalchemy object
                if not hasattr(val, "__call__") and not isinstance(val, BaseQuery):
                    field_name = fields_to_replace[field] if field in fields_to_replace else field
                    # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                    if isinstance(val.__class__, DeclarativeMeta) or \
                            (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                        # unless we're expanding this field, stop here
                        if field not in fields_to_expand:
                            # not expanding this field: set it to None and continue
                            fields[field_name] = None
                            continue

                    fields[field_name] = val
            # a json-encodable dict
            return fields

        return json.JSONEncoder.default(self, obj)
   return AlchemyEncoder

希望它能对某人有所帮助!

I know this is quite an older post. I took solution given by @SashaB and modified as per my need.

I added following things to it:

  1. Field ignore list: A list of fields to be ignored while serializing
  2. Field replace list: A dictionary containing field names to be replaced by values while serializing.
  3. Removed methods and BaseQuery getting serialized

My code is as follows:

def alchemy_json_encoder(revisit_self = False, fields_to_expand = [], fields_to_ignore = [], fields_to_replace = {}):
   """
   Serialize SQLAlchemy result into JSon
   :param revisit_self: True / False
   :param fields_to_expand: Fields which are to be expanded for including their children and all
   :param fields_to_ignore: Fields to be ignored while encoding
   :param fields_to_replace: Field keys to be replaced by values assigned in dictionary
   :return: Json serialized SQLAlchemy object
   """
   _visited_objs = []
   class AlchemyEncoder(json.JSONEncoder):
      def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            # don't re-visit self
            if revisit_self:
                if obj in _visited_objs:
                    return None
                _visited_objs.append(obj)

            # go through each field in this SQLalchemy class
            fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata' and x not in fields_to_ignore]:
                val = obj.__getattribute__(field)
                # is this field method defination, or an SQLalchemy object
                if not hasattr(val, "__call__") and not isinstance(val, BaseQuery):
                    field_name = fields_to_replace[field] if field in fields_to_replace else field
                    # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                    if isinstance(val.__class__, DeclarativeMeta) or \
                            (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                        # unless we're expanding this field, stop here
                        if field not in fields_to_expand:
                            # not expanding this field: set it to None and continue
                            fields[field_name] = None
                            continue

                    fields[field_name] = val
            # a json-encodable dict
            return fields

        return json.JSONEncoder.default(self, obj)
   return AlchemyEncoder

Hope it helps someone!


回答 15

使用SQLAlchemy中的内置序列化器

from sqlalchemy.ext.serializer import loads, dumps
obj = MyAlchemyObject()
# serialize object
serialized_obj = dumps(obj)

# deserialize object
obj = loads(serialized_obj)

如果要在会话之间转移对象,请记住使用将该对象与当前会话分离session.expunge(obj)。要再次附加它,只需执行即可session.add(obj)

Use the built-in serializer in SQLAlchemy:

from sqlalchemy.ext.serializer import loads, dumps
obj = MyAlchemyObject()
# serialize object
serialized_obj = dumps(obj)

# deserialize object
obj = loads(serialized_obj)

If you’re transferring the object between sessions, remember to detach the object from the current session using session.expunge(obj). To attach it again, just do session.add(obj).


回答 16

以下代码会将sqlalchemy结果序列化为json。

import json
from collections import OrderedDict


def asdict(self):
    result = OrderedDict()
    for key in self.__mapper__.c.keys():
        if getattr(self, key) is not None:
            result[key] = str(getattr(self, key))
        else:
            result[key] = getattr(self, key)
    return result


def to_array(all_vendors):
    v = [ ven.asdict() for ven in all_vendors ]
    return json.dumps(v) 

叫乐,

def all_products():
    all_products = Products.query.all()
    return to_array(all_products)

following code will serialize sqlalchemy result to json.

import json
from collections import OrderedDict


def asdict(self):
    result = OrderedDict()
    for key in self.__mapper__.c.keys():
        if getattr(self, key) is not None:
            result[key] = str(getattr(self, key))
        else:
            result[key] = getattr(self, key)
    return result


def to_array(all_vendors):
    v = [ ven.asdict() for ven in all_vendors ]
    return json.dumps(v) 

Calling fun,

def all_products():
    all_products = Products.query.all()
    return to_array(all_products)

回答 17

AlchemyEncoder很棒,但有时会失败,并使用十进制值。这是解决小数点问题的改进编码器-

class AlchemyEncoder(json.JSONEncoder):
# To serialize SQLalchemy objects 
def default(self, obj):
    if isinstance(obj.__class__, DeclarativeMeta):
        model_fields = {}
        for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
            data = obj.__getattribute__(field)
            print data
            try:
                json.dumps(data)  # this will fail on non-encodable values, like other classes
                model_fields[field] = data
            except TypeError:
                model_fields[field] = None
        return model_fields
    if isinstance(obj, Decimal):
        return float(obj)
    return json.JSONEncoder.default(self, obj)

The AlchemyEncoder is wonderful but sometimes fails with Decimal values. Here is an improved encoder that solves the decimal problem –

class AlchemyEncoder(json.JSONEncoder):
# To serialize SQLalchemy objects 
def default(self, obj):
    if isinstance(obj.__class__, DeclarativeMeta):
        model_fields = {}
        for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
            data = obj.__getattribute__(field)
            print data
            try:
                json.dumps(data)  # this will fail on non-encodable values, like other classes
                model_fields[field] = data
            except TypeError:
                model_fields[field] = None
        return model_fields
    if isinstance(obj, Decimal):
        return float(obj)
    return json.JSONEncoder.default(self, obj)

回答 18

当使用sqlalchemy连接到数据库时,这是一个高度可配置的简单解决方案。使用大熊猫。

import pandas as pd
import sqlalchemy

#sqlalchemy engine configuration
engine = sqlalchemy.create_engine....

def my_function():
  #read in from sql directly into a pandas dataframe
  #check the pandas documentation for additional config options
  sql_DF = pd.read_sql_table("table_name", con=engine)

  # "orient" is optional here but allows you to specify the json formatting you require
  sql_json = sql_DF.to_json(orient="index")

  return sql_json

When using sqlalchemy to connect to a db I this is a simple solution which is highly configurable. Use pandas.

import pandas as pd
import sqlalchemy

#sqlalchemy engine configuration
engine = sqlalchemy.create_engine....

def my_function():
  #read in from sql directly into a pandas dataframe
  #check the pandas documentation for additional config options
  sql_DF = pd.read_sql_table("table_name", con=engine)

  # "orient" is optional here but allows you to specify the json formatting you require
  sql_json = sql_DF.to_json(orient="index")

  return sql_json


回答 19

在Flask下,这可以工作并处理datatime字段,将类型的字段转换
'time': datetime.datetime(2018, 3, 22, 15, 40)
"time": "2018-03-22 15:40:00"

obj = {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

# This to get the JSON body
return json.dumps(obj)

# Or this to get a response object
return jsonify(obj)

Under Flask, this works and handles datatime fields, transforming a field of type
'time': datetime.datetime(2018, 3, 22, 15, 40) into
"time": "2018-03-22 15:40:00":

obj = {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

# This to get the JSON body
return json.dumps(obj)

# Or this to get a response object
return jsonify(obj)

回答 20

带有utf-8的内置串行器扼流圈无法解码某些输入的无效起始字节。相反,我去了:

def row_to_dict(row):
    temp = row.__dict__
    temp.pop('_sa_instance_state', None)
    return temp


def rows_to_list(rows):
    ret_rows = []
    for row in rows:
        ret_rows.append(row_to_dict(row))
    return ret_rows


@website_blueprint.route('/api/v1/some/endpoint', methods=['GET'])
def some_api():
    '''
    /some_endpoint
    '''
    rows = rows_to_list(SomeModel.query.all())
    response = app.response_class(
        response=jsonplus.dumps(rows),
        status=200,
        mimetype='application/json'
    )
    return response

The built in serializer chokes with utf-8 cannot decode invalid start byte for some inputs. Instead, I went with:

def row_to_dict(row):
    temp = row.__dict__
    temp.pop('_sa_instance_state', None)
    return temp


def rows_to_list(rows):
    ret_rows = []
    for row in rows:
        ret_rows.append(row_to_dict(row))
    return ret_rows


@website_blueprint.route('/api/v1/some/endpoint', methods=['GET'])
def some_api():
    '''
    /some_endpoint
    '''
    rows = rows_to_list(SomeModel.query.all())
    response = app.response_class(
        response=jsonplus.dumps(rows),
        status=200,
        mimetype='application/json'
    )
    return response

回答 21

也许您可以使用这样的类

from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy import Table


class Custom:
    """Some custom logic here!"""

    __table__: Table  # def for mypy

    @declared_attr
    def __tablename__(cls):  # pylint: disable=no-self-argument
        return cls.__name__  # pylint: disable= no-member

    def to_dict(self) -> Dict[str, Any]:
        """Serializes only column data."""
        return {c.name: getattr(self, c.name) for c in self.__table__.columns}

Base = declarative_base(cls=Custom)

class MyOwnTable(Base):
    #COLUMNS!

这样所有对象都有to_dict方法

Maybe you can use a class like this

from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy import Table


class Custom:
    """Some custom logic here!"""

    __table__: Table  # def for mypy

    @declared_attr
    def __tablename__(cls):  # pylint: disable=no-self-argument
        return cls.__name__  # pylint: disable= no-member

    def to_dict(self) -> Dict[str, Any]:
        """Serializes only column data."""
        return {c.name: getattr(self, c.name) for c in self.__table__.columns}

Base = declarative_base(cls=Custom)

class MyOwnTable(Base):
    #COLUMNS!

With that all objects have the to_dict method


回答 22

在使用一些原始sql和未定义的对象时,使用cursor.description似乎可以得到我想要的东西:

with connection.cursor() as cur:
    print(query)
    cur.execute(query)
    for item in cur.fetchall():
        row = {column.name: item[i] for i, column in enumerate(cur.description)}
        print(row)

While using some raw sql and undefined objects, using cursor.description appeared to get what I was looking for:

with connection.cursor() as cur:
    print(query)
    cur.execute(query)
    for item in cur.fetchall():
        row = {column.name: item[i] for i, column in enumerate(cur.description)}
        print(row)

回答 23

step1:
class CNAME:
   ...
   def as_dict(self):
       return {item.name: getattr(self, item.name) for item in self.__table__.columns}

step2:
list = []
for data in session.query(CNAME).all():
    list.append(data.as_dict())

step3:
return jsonify(list)
step1:
class CNAME:
   ...
   def as_dict(self):
       return {item.name: getattr(self, item.name) for item in self.__table__.columns}

step2:
list = []
for data in session.query(CNAME).all():
    list.append(data.as_dict())

step3:
return jsonify(list)

回答 24

我使用(太多?)词典的观点:

def serialize(_query):
    #d = dictionary written to per row
    #D = dictionary d is written to each time, then reset
    #Master = dictionary of dictionaries; the id Key (int, unique from database) 
    from D is used as the Key for the dictionary D entry in Master
    Master = {}
    D = {}
    x = 0
    for u in _query:
        d = u.__dict__
        D = {}
        for n in d.keys():
           if n != '_sa_instance_state':
                    D[n] = d[n]
        x = d['id']
        Master[x] = D
    return Master

与flask(包括jsonify)和flask_sqlalchemy一起运行,以将输出打印为JSON。

使用jsonify(serialize())调用该函数。

适用于到目前为止我尝试过的所有SQLAlchemy查询(运行SQLite3)

My take utilizing (too many?) dictionaries:

def serialize(_query):
    #d = dictionary written to per row
    #D = dictionary d is written to each time, then reset
    #Master = dictionary of dictionaries; the id Key (int, unique from database) 
    from D is used as the Key for the dictionary D entry in Master
    Master = {}
    D = {}
    x = 0
    for u in _query:
        d = u.__dict__
        D = {}
        for n in d.keys():
           if n != '_sa_instance_state':
                    D[n] = d[n]
        x = d['id']
        Master[x] = D
    return Master

Running with flask (including jsonify) and flask_sqlalchemy to print outputs as JSON.

Call the function with jsonify(serialize()).

Works with all SQLAlchemy queries I’ve tried so far (running SQLite3)


Records-面向人类的sql

Records:用于人类的sql™

Records是一个非常简单但功能强大的库,用于对大多数关系数据库进行原始SQL查询

https://farm1.staticflickr.com/569/33085227621_7e8da49b90_k_d.jpg

只需编写SQL即可。没有铃声,没有口哨。使用可用的标准工具,这项常见任务可能会出人意料地困难。该库努力使此工作流尽可能简单,同时提供一个优雅的界面来处理您的查询结果

数据库支持包括RedShift、Postgres、MySQL、SQLite、Oracle和MS-SQL(不包括驱动程序)


☤基础知识

我们知道如何编写SQL,所以让我们将一些内容发送到我们的数据库:

import records

db = records.Database('postgres://...')
rows = db.query('select * from active_users')    # or db.query_file('sqls/active-users.sql')

一次抓取一行:

>>> rows[0]
<Record {"username": "model-t", "active": true, "name": "Henry Ford", "user_email": "model-t@gmail.com", "timezone": "2016-02-06 22:28:23.894202"}>

或者迭代它们:

for r in rows:
    print(r.name, r.user_email)

可以通过多种方式访问值:row.user_emailrow['user_email'],或row[3]

还完全支持包含非字母数字字符(如空格)的字段

或存储您的记录集合的副本以供以后参考:

>>> rows.all()
[<Record {"username": ...}>, <Record {"username": ...}>, <Record {"username": ...}>, ...]

如果您只期待一个结果:

>>> rows.first()
<Record {"username": ...}>

其他选项包括rows.as_dict()rows.as_dict(ordered=True)

☤功能

  • 迭代行被缓存以供将来引用
  • $DATABASE_URL环境变量支持
  • 便利性Database.get_table_names方法
  • 用于导出查询的命令行记录工具
  • 安全参数化:Database.query('life=:everything', everything=42)
  • 查询可以作为字符串或文件名传递,支持的参数
  • 交易记录:t = Database.transaction(); t.commit()
  • 批量操作:Database.bulk_query()&Database.bulk_query_file()

唱片公司自豪地由SQLAlchemyTablib

☤数据导出功能

Record还具有完全的Tablib集成功能,允许您通过一行代码将结果导出为CSV、XLS、JSON、HTML表、YAML或Pandas DataFrames。非常适合与朋友共享数据或生成报告

>>> print(rows.dataset)
username|active|name      |user_email       |timezone
--------|------|----------|-----------------|--------------------------
model-t |True  |Henry Ford|model-t@gmail.com|2016-02-06 22:28:23.894202
...

逗号分隔值(CSV)

>>> print(rows.export('csv'))
username,active,name,user_email,timezone
model-t,True,Henry Ford,model-t@gmail.com,2016-02-06 22:28:23.894202
...

YAML Ain‘t Markup Language(YAML)

>>> print(rows.export('yaml'))
- {active: true, name: Henry Ford, timezone: '2016-02-06 22:28:23.894202', user_email: model-t@gmail.com, username: model-t}
...

JavaScript对象表示法(JSON)

>>> print(rows.export('json'))
[{"username": "model-t", "active": true, "name": "Henry Ford", "user_email": "model-t@gmail.com", "timezone": "2016-02-06 22:28:23.894202"}, ...]

Microsoft Excel(xls,xlsx)

with open('report.xls', 'wb') as f:
    f.write(rows.export('xls'))

熊猫数据帧

>>> rows.export('df')
    username  active       name        user_email                   timezone
0    model-t    True Henry Ford model-t@gmail.com 2016-02-06 22:28:23.894202

你说对了。Tablib的所有其他功能也可用,因此您可以对结果进行排序、添加/删除列/行、删除重复项、转置表格、添加分隔符、按列切片数据等

请参阅Tablib Documentation有关更多详细信息,请参阅

☤安装

当然,推荐的安装方法是pipenv

$ pipenv install records[pandas]
✨🍰✨

☤命令行工具

作为额外的奖励,一个records自动包括命令行工具。以下是使用信息的截图:

Screenshot of Records Command-Line Interface.

☤,谢谢你

感谢您借阅本图书馆!我希望你会觉得它有用

当然,总有改进的空间。请随意……open an issue所以我们可以把唱片做得更好、更强、更快

SQLAlchemy IN子句

问题:SQLAlchemy IN子句

我正在尝试在sqlalchemy中执行此查询

SELECT id, name FROM user WHERE id IN (123, 456)

我想[123, 456]在执行时绑定列表。

I’m trying to do this query in sqlalchemy

SELECT id, name FROM user WHERE id IN (123, 456)

I would like to bind the list [123, 456] at execution time.


回答 0

怎么样

session.query(MyUserClass).filter(MyUserClass.id.in_((123,456))).all()

编辑:没有ORM,它将是

session.execute(
    select(
        [MyUserTable.c.id, MyUserTable.c.name], 
        MyUserTable.c.id.in_((123, 456))
    )
).fetchall()

select()需要两个参数,第一个是要检索的字段列表,第二个是where条件。您可以通过c(或columns)属性访问表对象上的所有字段。

How about

session.query(MyUserClass).filter(MyUserClass.id.in_((123,456))).all()

edit: Without the ORM, it would be

session.execute(
    select(
        [MyUserTable.c.id, MyUserTable.c.name], 
        MyUserTable.c.id.in_((123, 456))
    )
).fetchall()

select() takes two parameters, the first one is a list of fields to retrieve, the second one is the where condition. You can access all fields on a table object via the c (or columns) property.


回答 1

假设您使用声明式样式(即ORM类),则非常简单:

query = db_session.query(User.id, User.name).filter(User.id.in_([123,456]))
results = query.all()

db_session是您的数据库会话,User而是__tablename__等于的ORM类"users"

Assuming you use the declarative style (i.e. ORM classes), it is pretty easy:

query = db_session.query(User.id, User.name).filter(User.id.in_([123,456]))
results = query.all()

db_session is your database session here, while User is the ORM class with __tablename__ equal to "users".


回答 2

另一种方法是将原始SQL模式与SQLAlchemy结合使用,我使用SQLAlchemy 0.9.8,python 2.7,MySQL 5.X和MySQL-Python作为连接器,在这种情况下,需要一个元组。我的代码如下:

id_list = [1, 2, 3, 4, 5] # in most case we have an integer list or set
s = text('SELECT id, content FROM myTable WHERE id IN :id_list')
conn = engine.connect() # get a mysql connection
rs = conn.execute(s, id_list=tuple(id_list)).fetchall()

希望一切对您有用。

An alternative way is using raw SQL mode with SQLAlchemy, I use SQLAlchemy 0.9.8, python 2.7, MySQL 5.X, and MySQL-Python as connector, in this case, a tuple is needed. My code listed below:

id_list = [1, 2, 3, 4, 5] # in most case we have an integer list or set
s = text('SELECT id, content FROM myTable WHERE id IN :id_list')
conn = engine.connect() # get a mysql connection
rs = conn.execute(s, id_list=tuple(id_list)).fetchall()

Hope everything works for you.


回答 3

使用表达式API(基于注释就是这个问题的要求),您可以使用in_相关列的方法。

查询

SELECT id, name FROM user WHERE id in (123,456)

myList = [123, 456]
select = sqlalchemy.sql.select([user_table.c.id, user_table.c.name], user_table.c.id.in_(myList))
result = conn.execute(select)
for row in result:
    process(row)

这假定user_table并且conn已经适当定义。

With the expression API, which based on the comments is what this question is asking for, you can use the in_ method of the relevant column.

To query

SELECT id, name FROM user WHERE id in (123,456)

use

myList = [123, 456]
select = sqlalchemy.sql.select([user_table.c.id, user_table.c.name], user_table.c.id.in_(myList))
result = conn.execute(select)
for row in result:
    process(row)

This assumes that user_table and conn have been defined appropriately.


回答 4

只是想与我在python 3中使用sqlalchemy和pandas共享我的解决方案。也许,有人会觉得它有用。

import sqlalchemy as sa
import pandas as pd
engine = sa.create_engine("postgresql://postgres:my_password@my_host:my_port/my_db")
values = [val1,val2,val3]   
query = sa.text(""" 
                SELECT *
                FROM my_table
                WHERE col1 IN :values; 
""")
query = query.bindparams(values=tuple(values))
df = pd.read_sql(query, engine)

Just wanted to share my solution using sqlalchemy and pandas in python 3. Perhaps, one would find it useful.

import sqlalchemy as sa
import pandas as pd
engine = sa.create_engine("postgresql://postgres:my_password@my_host:my_port/my_db")
values = [val1,val2,val3]   
query = sa.text(""" 
                SELECT *
                FROM my_table
                WHERE col1 IN :values; 
""")
query = query.bindparams(values=tuple(values))
df = pd.read_sql(query, engine)

回答 5

只是上述答案的补充。

如果要使用“ IN”语句执行SQL,则可以执行以下操作:

ids_list = [1,2,3]
query = "SELECT id, name FROM user WHERE id IN %s" 
args = [(ids_list,)] # Don't forget the "comma", to force the tuple
conn.execute(query, args)

两点:

  • IN语句不需要括号(例如“ … IN(%s)”),只需输入“ … IN%s”
  • 强制将ID列表作为元组的一个元素。别忘了“,”:(ids_list,)

编辑请 注意,如果列表的长度为一或零,则将引发错误!

Just an addition to the answers above.

If you want to execute a SQL with an “IN” statement you could do this:

ids_list = [1,2,3]
query = "SELECT id, name FROM user WHERE id IN %s" 
args = [(ids_list,)] # Don't forget the "comma", to force the tuple
conn.execute(query, args)

Two points:

  • There is no need for Parenthesis for the IN statement(like “… IN(%s) “), just put “…IN %s”
  • Force the list of your ids to be one element of a tuple. Don’t forget the ” , ” : (ids_list,)

EDIT Watch out that if the length of list is one or zero this will raise an error!


SQLAlchemy中filter和filter_by之间的区别

问题:SQLAlchemy中filter和filter_by之间的区别

谁能解释SQLAlchemy filterfilter_by函数之间的区别?我应该使用哪一个?

Could anyone explain the difference between filter and filter_by functions in SQLAlchemy? Which one should I be using?


回答 0

filter_by 用于使用常规kwarg对列名称进行简单查询,例如

db.users.filter_by(name='Joe')

可以使用filter,而不使用kwargs,而是使用’==’等号运算符(已在db.users.name对象上重载)来完成相同的操作:

db.users.filter(db.users.name=='Joe')

您还可以使用编写更强大的查询filter,例如以下表达式:

db.users.filter(or_(db.users.name=='Ryan', db.users.country=='England'))

filter_by is used for simple queries on the column names using regular kwargs, like

db.users.filter_by(name='Joe')

The same can be accomplished with filter, not using kwargs, but instead using the ‘==’ equality operator, which has been overloaded on the db.users.name object:

db.users.filter(db.users.name=='Joe')

You can also write more powerful queries using filter, such as expressions like:

db.users.filter(or_(db.users.name=='Ryan', db.users.country=='England'))


回答 1

实际上,我们最初将它们合并在一起,也就是说,有一个类似“过滤器”的方法接受*args**kwargs,您可以在其中传递SQL表达式或关键字参数(或两者)。我实际上发现这更方便,但是人们总是对此感到困惑,因为他们通常仍然会克服column == expression和之间的区别keyword = expression。所以我们将它们分开。

We actually had these merged together originally, i.e. there was a “filter”-like method that accepted *args and **kwargs, where you could pass a SQL expression or keyword arguments (or both). I actually find that a lot more convenient, but people were always confused by it, since they’re usually still getting over the difference between column == expression and keyword = expression. So we split them up.


回答 2

filter_by使用关键字参数,而filter允许使用pythonic过滤参数,例如filter(User.name=="john")

filter_by uses keyword arguments, whereas filter allows pythonic filtering arguments like filter(User.name=="john")


回答 3

它是用于加快查询编写速度的语法糖。它以伪代码实现:

def filter_by(self, **kwargs):
    return self.filter(sql.and_(**kwargs))

对于AND,您可以简单地编写:

session.query(db.users).filter_by(name='Joe', surname='Dodson')

顺便说一句

session.query(db.users).filter(or_(db.users.name=='Ryan', db.users.country=='England'))

可以写成

session.query(db.users).filter((db.users.name=='Ryan') | (db.users.country=='England'))

您也可以通过PK直接通过get方法获取对象:

Users.query.get(123)
# And even by a composite PK
Users.query.get(123, 321)

使用get案例时,重要的是对象可以在没有数据库请求的情况下返回identity map,可以用作缓存(与事务关联)

It is a syntax sugar for faster query writing. Its implementation in pseudocode:

def filter_by(self, **kwargs):
    return self.filter(sql.and_(**kwargs))

For AND you can simply write:

session.query(db.users).filter_by(name='Joe', surname='Dodson')

btw

session.query(db.users).filter(or_(db.users.name=='Ryan', db.users.country=='England'))

can be written as

session.query(db.users).filter((db.users.name=='Ryan') | (db.users.country=='England'))

Also you can get object directly by PK via get method:

Users.query.get(123)
# And even by a composite PK
Users.query.get(123, 321)

When using get case its important that object can be returned without database request from identity map which can be used as cache(associated with transaction)


SQLAlchemy按降序排列?

问题:SQLAlchemy按降序排列?

如何descending在如下所示的SQLAlchemy查询中使用ORDER BY ?

此查询有效,但以升序返回:

query = (model.Session.query(model.Entry)
        .join(model.ClassificationItem)
        .join(model.EnumerationValue)
        .filter_by(id=c.row.id)
        .order_by(model.Entry.amount) # This row :)
        )

如果我尝试:

.order_by(desc(model.Entry.amount))

然后我得到:NameError: global name 'desc' is not defined

How can I use ORDER BY descending in a SQLAlchemy query like the following?

This query works, but returns them in ascending order:

query = (model.Session.query(model.Entry)
        .join(model.ClassificationItem)
        .join(model.EnumerationValue)
        .filter_by(id=c.row.id)
        .order_by(model.Entry.amount) # This row :)
        )

If I try:

.order_by(desc(model.Entry.amount))

then I get: NameError: global name 'desc' is not defined.


回答 0

与FYI一样,您也可以将这些内容指定为列属性。例如,我可能已经做过:

.order_by(model.Entry.amount.desc())

这很方便,因为它避免了import,并且可以在其他地方(例如关系定义等)中使用它。

有关更多信息,您可以参考此

Just as an FYI, you can also specify those things as column attributes. For instance, I might have done:

.order_by(model.Entry.amount.desc())

This is handy since it avoids an import, and you can use it on other places such as in a relation definition, etc.

For more information, you can refer this


回答 1

from sqlalchemy import desc
someselect.order_by(desc(table1.mycol))

来自@ jpmc26的用法

from sqlalchemy import desc
someselect.order_by(desc(table1.mycol))

Usage from @jpmc26


回答 2

您可能要做的另一件事是:

.order_by("name desc")

这将导致:ORDER BY名称desc。这里的缺点是按顺序使用显式列名。

One other thing you might do is:

.order_by("name desc")

This will result in: ORDER BY name desc. The disadvantage here is the explicit column name used in order by.


回答 3

您可以.desc()像这样在查询中使用函数

query = (model.Session.query(model.Entry)
        .join(model.ClassificationItem)
        .join(model.EnumerationValue)
        .filter_by(id=c.row.id)
        .order_by(model.Entry.amount.desc())
        )

这将按金额降序排列,或者

query = session.query(
    model.Entry
).join(
    model.ClassificationItem
).join(
    model.EnumerationValue
).filter_by(
    id=c.row.id
).order_by(
    model.Entry.amount.desc()
)
)

使用SQLAlchemy的desc函数

from sqlalchemy import desc
query = session.query(
    model.Entry
).join(
    model.ClassificationItem
).join(
    model.EnumerationValue
).filter_by(
    id=c.row.id
).order_by(
    desc(model.Entry.amount)
)
)

对于官方文档,请使用链接或检查以下代码段

sqlalchemy.sql.expression.desc(column)产生一个降序的ORDER BY子句元素。

例如:

from sqlalchemy import desc

stmt = select([users_table]).order_by(desc(users_table.c.name))

将产生如下SQL:

SELECT id, name FROM user ORDER BY name DESC

desc()函数是ColumnElement.desc()方法的独立版本,可用于所有SQL表达式,例如:

stmt = select([users_table]).order_by(users_table.c.name.desc())

参数column –一个ColumnElement(例如标量SQL表达式),用于应用desc()操作。

也可以看看

asc()

nullsfirst()

nullslast()

Select.order_by()

You can use .desc() function in your query just like this

query = (model.Session.query(model.Entry)
        .join(model.ClassificationItem)
        .join(model.EnumerationValue)
        .filter_by(id=c.row.id)
        .order_by(model.Entry.amount.desc())
        )

This will order by amount in descending order or

query = session.query(
    model.Entry
).join(
    model.ClassificationItem
).join(
    model.EnumerationValue
).filter_by(
    id=c.row.id
).order_by(
    model.Entry.amount.desc()
)
)

Use of desc function of SQLAlchemy

from sqlalchemy import desc
query = session.query(
    model.Entry
).join(
    model.ClassificationItem
).join(
    model.EnumerationValue
).filter_by(
    id=c.row.id
).order_by(
    desc(model.Entry.amount)
)
)

For official docs please use the link or check below snippet

sqlalchemy.sql.expression.desc(column) Produce a descending ORDER BY clause element.

e.g.:

from sqlalchemy import desc

stmt = select([users_table]).order_by(desc(users_table.c.name))

will produce SQL as:

SELECT id, name FROM user ORDER BY name DESC

The desc() function is a standalone version of the ColumnElement.desc() method available on all SQL expressions, e.g.:

stmt = select([users_table]).order_by(users_table.c.name.desc())

Parameters column – A ColumnElement (e.g. scalar SQL expression) with which to apply the desc() operation.

See also

asc()

nullsfirst()

nullslast()

Select.order_by()


回答 4

你可以试试: .order_by(ClientTotal.id.desc())

session = Session()
auth_client_name = 'client3' 
result_by_auth_client = session.query(ClientTotal).filter(ClientTotal.client ==
auth_client_name).order_by(ClientTotal.id.desc()).all()

for rbac in result_by_auth_client:
    print(rbac.id) 
session.close()

You can try: .order_by(ClientTotal.id.desc())

session = Session()
auth_client_name = 'client3' 
result_by_auth_client = session.query(ClientTotal).filter(ClientTotal.client ==
auth_client_name).order_by(ClientTotal.id.desc()).all()

for rbac in result_by_auth_client:
    print(rbac.id) 
session.close()

回答 5

@Radu答案的补充,与SQL一样,如果您有许多具有相同属性的表,则可以在参数中添加表名。

.order_by("TableName.name desc")

Complementary at @Radu answer, As in SQL, you can add the table name in the parameter if you have many table with the same attribute.

.order_by("TableName.name desc")

SQLAlchemy:flush()和commit()有什么区别?

问题:SQLAlchemy:flush()和commit()有什么区别?

flush()commit()SQLAlchemy 之间有什么区别?

我已经阅读了文档,但没有一个更明智-他们似乎假设了我没有的预见性。

我对它们对内存使用量的影响特别感兴趣。我正在从一系列文件(总共约500万行)中将一些数据加载到数据库中,而我的会话有时会崩溃-这是一个大型数据库,并且一台内存不足的机器。

我想知道我使用的电话太多commit()还是不够flush()-但是如果不真正了解两者之间的区别,很难分辨!

What the difference is between flush() and commit() in SQLAlchemy?

I’ve read the docs, but am none the wiser – they seem to assume a pre-understanding that I don’t have.

I’m particularly interested in their impact on memory usage. I’m loading some data into a database from a series of files (around 5 million rows in total) and my session is occasionally falling over – it’s a large database and a machine with not much memory.

I’m wondering if I’m using too many commit() and not enough flush() calls – but without really understanding what the difference is, it’s hard to tell!


回答 0

会话对象基本上是数据库更改(更新,插入,删除)的正在进行的事务。这些操作在提交之前不会持久化到数据库中(如果您的程序在会话中事务中由于某种原因中止,则其中所有未提交的更改都将丢失)。

会话对象向注册了事务操作session.add(),但是直到session.flush()调用它之前,它们都不会将它们传递给数据库。

session.flush()将一系列操作传达给数据库(插入,更新,删除)。数据库将它们维护为事务中的挂起操作。直到数据库收到当前事务的COMMIT为止,session.commit()所做的更改才会永久保留在磁盘上,或对其他事务可见。

session.commit() 将这些更改提交(持久)到数据库。

flush()始终称为一个呼叫的一部分commit()1)。

当您使用Session对象查询数据库时,查询将同时从数据库及其所保存的未提交事务的已刷新部分返回结果。默认情况下,Session反对autoflush其操作,但是可以禁用它。

希望这个例子可以使这个更加清楚:

#---
s = Session()

s.add(Foo('A')) # The Foo('A') object has been added to the session.
                # It has not been committed to the database yet,
                #   but is returned as part of a query.
print 1, s.query(Foo).all()
s.commit()

#---
s2 = Session()
s2.autoflush = False

s2.add(Foo('B'))
print 2, s2.query(Foo).all() # The Foo('B') object is *not* returned
                             #   as part of this query because it hasn't
                             #   been flushed yet.
s2.flush()                   # Now, Foo('B') is in the same state as
                             #   Foo('A') was above.
print 3, s2.query(Foo).all() 
s2.rollback()                # Foo('B') has not been committed, and rolling
                             #   back the session's transaction removes it
                             #   from the session.
print 4, s2.query(Foo).all()

#---
Output:
1 [<Foo('A')>]
2 [<Foo('A')>]
3 [<Foo('A')>, <Foo('B')>]
4 [<Foo('A')>]

A Session object is basically an ongoing transaction of changes to a database (update, insert, delete). These operations aren’t persisted to the database until they are committed (if your program aborts for some reason in mid-session transaction, any uncommitted changes within are lost).

The session object registers transaction operations with session.add(), but doesn’t yet communicate them to the database until session.flush() is called.

session.flush() communicates a series of operations to the database (insert, update, delete). The database maintains them as pending operations in a transaction. The changes aren’t persisted permanently to disk, or visible to other transactions until the database receives a COMMIT for the current transaction (which is what session.commit() does).

session.commit() commits (persists) those changes to the database.

flush() is always called as part of a call to commit() (1).

When you use a Session object to query the database, the query will return results both from the database and from the flushed parts of the uncommitted transaction it holds. By default, Session objects autoflush their operations, but this can be disabled.

Hopefully this example will make this clearer:

#---
s = Session()

s.add(Foo('A')) # The Foo('A') object has been added to the session.
                # It has not been committed to the database yet,
                #   but is returned as part of a query.
print 1, s.query(Foo).all()
s.commit()

#---
s2 = Session()
s2.autoflush = False

s2.add(Foo('B'))
print 2, s2.query(Foo).all() # The Foo('B') object is *not* returned
                             #   as part of this query because it hasn't
                             #   been flushed yet.
s2.flush()                   # Now, Foo('B') is in the same state as
                             #   Foo('A') was above.
print 3, s2.query(Foo).all() 
s2.rollback()                # Foo('B') has not been committed, and rolling
                             #   back the session's transaction removes it
                             #   from the session.
print 4, s2.query(Foo).all()

#---
Output:
1 [<Foo('A')>]
2 [<Foo('A')>]
3 [<Foo('A')>, <Foo('B')>]
4 [<Foo('A')>]

回答 1

正如@snapshoe所说

flush() 将您的SQL语句发送到数据库

commit() 提交交易。

时间session.autocommit == False

commit()flush()如果您设置,会打电话autoflush == True

时间session.autocommit == True

commit()如果您尚未开始交易,则无法调用(您可能没有,因为您可能仅使用此模式来避免手动管理交易)。

在这种模式下,您必须调用flush()以保存您的ORM更改。刷新还会有效地提交您的数据。

As @snapshoe says

flush() sends your SQL statements to the database

commit() commits the transaction.

When session.autocommit == False:

commit() will call flush() if you set autoflush == True.

When session.autocommit == True:

You can’t call commit() if you haven’t started a transaction (which you probably haven’t since you would probably only use this mode to avoid manually managing transactions).

In this mode, you must call flush() to save your ORM changes. The flush effectively also commits your data.


回答 2

如果可以提交,为什么还要刷新?

作为刚接触数据库和sqlalchemy的新手,以前的答案- flush()将SQL语句发送到数据库并commit()保留它们-对我来说还不清楚。这些定义很有意义,但是从定义中并不清楚为什么您要使用冲洗而不是仅仅提交。

由于提交始终会刷新(https://docs.sqlalchemy.org/en/13/orm/session_basics.html#committing),因此听起来确实很相似。我认为要强调的大问题是,刷新不是永久的并且可以撤消,而提交是永久的,在某种意义上,您不能要求数据库撤消上一次提交(我认为)

@snapshoe突出显示,如果要查询数据库并获取包含新添加对象的结果,则需要先刷新(或提交,然后将为您刷新)。也许这对某些人有用,尽管我不确定为什么您要刷新而不是提交(除了可以撤消的琐碎回答之外)。

在另一个示例中,我正在本地数据库和远程服务器之间同步文档,并且如果用户决定取消,则应撤消所有添加/更新/删除操作(即,不进行部分同步,仅进行完全同步)。更新单个文档时,我决定只删除旧行,然后从远程服务器添加更新的版本。事实证明,由于sqlalchemy的编写方式,不能保证提交时的操作顺序。这导致添加了一个重复版本(在尝试删除旧版本之前),这导致数据库无法通过唯一约束。为了解决这个问题,我使用flush()了顺序,但是如果以后同步过程失败,我仍然可以撤消操作。

在以下位置查看我的帖子:在sqlalchemy中提交时,添加和删除是否有任何顺序

同样,有人想知道提交时是否保持添加顺序,即如果我先添加object1然后添加object2是否在将对象添加到会话时将SQLAlchemy保存object1到数据库之前object2

同样,这里假定使用flush()将确保所需的行为。因此,总而言之,刷新的一种用途是提供顺序保证(我认为),同时又允许自己使用提交不提供的“撤消”选项。

自动刷新和自动提交

注意,自动刷新可用于确保查询对更新的数据库起作用,因为sqlalchemy将在执行查询之前刷新。https://docs.sqlalchemy.org/zh/13/orm/session_api.html#sqlalchemy.orm.session.Session.params.autoflush

自动提交是我尚不完全了解的其他东西,但听起来不鼓励使用它:https : //docs.sqlalchemy.org/en/13/orm/session_api.html#sqlalchemy.orm.session.Session.params。自动提交

内存使用情况

现在,最初的问题实际上是想了解刷新与提交对内存的影响。由于持久性或不持久性是数据库提供的(我认为),因此仅刷新就足以卸载数据库-尽管如果您不关心撤消操作,提交也不会受到损害(实际上可能会有所帮助-见下文)。 。

sqlalchemy对已刷新的对象使用弱引用:https : //docs.sqlalchemy.org/en/13/orm/session_state_management.html#session-referencing-behavior

这意味着,如果您没有将对象明确保留在某个地方(例如列表或dict中),则sqlalchemy不会将其保留在内存中。

但是,这时您需要担心数据库方面的问题。可能在未提交的情况下进行刷新会带来一些内存损失,以维护事务。同样,我对此并不陌生,但是这里的链接似乎恰恰表明了这一点:https : //stackoverflow.com/a/15305650/764365

换句话说,尽管应该在内存和性能之间进行权衡,但是提交应该减少内存的使用。换句话说,您可能不想一次提交每一个数据库更改(出于性能原因),但是等待太长时间会增加内存使用率。

Why flush if you can commit?

As someone new to working with databases and sqlalchemy, the previous answers – that flush() sends SQL statements to the DB and commit() persists them – were not clear to me. The definitions make sense but it isn’t immediately clear from the definitions why you would use a flush instead of just committing.

Since a commit always flushes (https://docs.sqlalchemy.org/en/13/orm/session_basics.html#committing) these sound really similar. I think the big issue to highlight is that a flush is not permanent and can be undone, whereas a commit is permanent, in the sense that you can’t ask the database to undo the last commit (I think)

@snapshoe highlights that if you want to query the database and get results that include newly added objects, you need to have flushed first (or committed, which will flush for you). Perhaps this is useful for some people although I’m not sure why you would want to flush rather than commit (other than the trivial answer that it can be undone).

In another example I was syncing documents between a local DB and a remote server, and if the user decided to cancel, all adds/updates/deletes should be undone (i.e. no partial sync, only a full sync). When updating a single document I’ve decided to simply delete the old row and add the updated version from the remote server. It turns out that due to the way sqlalchemy is written, order of operations when committing is not guaranteed. This resulted in adding a duplicate version (before attempting to delete the old one), which resulted in the DB failing a unique constraint. To get around this I used flush() so that order was maintained, but I could still undo if later the sync process failed.

See my post on this at: Is there any order for add versus delete when committing in sqlalchemy

Similarly, someone wanted to know whether add order is maintained when committing, i.e. if I add object1 then add object2, does object1 get added to the database before object2 Does SQLAlchemy save order when adding objects to session?

Again, here presumably the use of a flush() would ensure the desired behavior. So in summary, one use for flush is to provide order guarantees (I think), again while still allowing yourself an “undo” option that commit does not provide.

Autoflush and Autocommit

Note, autoflush can be used to ensure queries act on an updated database as sqlalchemy will flush before executing the query. https://docs.sqlalchemy.org/en/13/orm/session_api.html#sqlalchemy.orm.session.Session.params.autoflush

Autocommit is something else that I don’t completely understand but it sounds like its use is discouraged: https://docs.sqlalchemy.org/en/13/orm/session_api.html#sqlalchemy.orm.session.Session.params.autocommit

Memory Usage

Now the original question actually wanted to know about the impact of flush vs. commit for memory purposes. As the ability to persist or not is something the database offers (I think), simply flushing should be sufficient to offload to the database – although committing shouldn’t hurt (actually probably helps – see below) if you don’t care about undoing.

sqlalchemy uses weak referencing for objects that have been flushed: https://docs.sqlalchemy.org/en/13/orm/session_state_management.html#session-referencing-behavior

This means if you don’t have an object explicitly held onto somewhere, like in a list or dict, sqlalchemy won’t keep it in memory.

However, then you have the database side of things to worry about. Presumably flushing without committing comes with some memory penalty to maintain the transaction. Again, I’m new to this but here’s a link that seems to suggest exactly this: https://stackoverflow.com/a/15305650/764365

In other words, commits should reduce memory usage, although presumably there is a trade-off between memory and performance here. In other words, you probably don’t want to commit every single database change, one at a time (for performance reasons), but waiting too long will increase memory usage.


回答 3

这并不能严格回答最初的问题,但是有些人提到session.autoflush = True不必与您一起使用session.flush()……而且并非总是如此。

如果要在事务中间使用新创建的对象的ID,则必须调用session.flush()

# Given a model with at least this id
class AModel(Base):
   id = Column(Integer, primary_key=True)  # autoincrement by default on integer primary key

session.autoflush = True

a = AModel()
session.add(a)
a.id  # None
session.flush()
a.id  # autoincremented integer

这是因为autoflush自动填写ID(尽管对象的查询将,这有时会导致混乱,如“为什么这个作品在这里,但不是吗?”但是,snapshoe已涵盖这一部分)。


一个相关方面对我来说似乎很重要,但并未真正提及:

为什么不一直承诺呢?-答案是原子性

可以这么说:所有操作必须全部成功执行,否则任何一个都不会生效。

例如,如果要创建/更新/删除某个对象(A),然后创建/更新/删除另一个对象(B),但是如果(B)失败,则要还原(A)。这意味着这两个操作是原子操作。

因此,如果(B)需要(A)的结果,则要flush在(A)commit之后和(B)之后调用。

另外,如果是session.autoflush is True,除了我上面提到的情况或Jimbo的回答中的其他情况外,您将不需要flush手动调用。

This does not strictly answer the original question but some people have mentioned that with session.autoflush = True you don’t have to use session.flush()… And this is not always true.

If you want to use the id of a newly created object in the middle of a transaction, you must call session.flush().

# Given a model with at least this id
class AModel(Base):
   id = Column(Integer, primary_key=True)  # autoincrement by default on integer primary key

session.autoflush = True

a = AModel()
session.add(a)
a.id  # None
session.flush()
a.id  # autoincremented integer

This is because autoflush does NOT auto fill the id (although a query of the object will, which sometimes can cause confusion as in “why this works here but not there?” But snapshoe already covered this part).


One related aspect that seems pretty important to me and wasn’t really mentioned:

Why would you not commit all the time? – The answer is atomicity.

A fancy word to say: an ensemble of operations have to all be executed successfully OR none of them will take effect.

For example, if you want to create/update/delete some object (A) and then create/update/delete another (B), but if (B) fails you want to revert (A). This means those 2 operations are atomic.

Therefore, if (B) needs a result of (A), you want to call flush after (A) and commit after (B).

Also, if session.autoflush is True, except for the case that I mentioned above or others in Jimbo‘s answer, you will not need to call flush manually.


Sqlalchemy-Python数据库工具包

Python SQL工具包和对象关系映射器

引言

SQLAlChemy是Python SQL工具包和对象关系映射器,它为应用程序开发人员提供了SQL的全部功能和灵活性。SQLAlChemy提供了一整套众所周知的企业级持久化模式,专为高效和高性能的数据库访问而设计,并改编成简单的Python域语言

SQLAlChemy的主要功能包括:

  • 工业实力ORM,从身份图、工作单元和数据映射器模式上的核心构建。这些模式允许使用声明性配置系统透明地持久化对象。域模型可以自然地构建和操作,并且更改会自动与当前事务同步
  • 面向关系的查询系统,根据对象模型显式公开SQL的全部功能,包括联接、子查询、关联和几乎所有其他功能。使用ORM编写查询使用的关系组合技术与编写SQL时使用的技术相同。虽然您可以随时使用文字SQL,但实际上从来不需要它
  • 一个全面而灵活的系统,可以立即加载相关集合和对象。集合缓存在会话中,并且可以在单个访问时使用联接一次加载,也可以通过跨整个结果集的每个集合查询一次加载
  • 核心SQL构建系统和DBAPI交互层。SQLAlChemy Core独立于ORM,本身就是一个完整的数据库抽象层,包括可扩展的基于Python的SQL表达式语言、模式元数据、连接池、类型强制和自定义类型
  • 所有主键和外键约束都假定是复合的和自然的。当然,代理整数主键仍然是规范,但是SQLAlChemy从不假定或硬编码此模型
  • 数据库自省和生成。数据库模式可以在一个步骤中“反映”到表示数据库元数据的Python结构中;然后,这些相同的结构可以立即生成CREATE语句-所有这些语句都在Core中,独立于ORM

SQLAlChemy的理念:

  • SQL数据库的行为越来越不像对象集合,大小和性能越重要;对象集合的行为越不像表和行,抽象就越重要。SQLAlChemy旨在同时满足这两个原则
  • ORM不需要隐藏“R”。关系数据库提供了丰富的、基于集合的功能,这些功能应该完全公开。SQLAlChemy的ORM提供了一组开放式模式,允许开发人员在域模型和关系模式之间构建自定义中介层,将所谓的“对象关系阻抗”问题变成遥远的记忆
  • 在所有情况下,开发人员都要做出关于对象模型和关系模式的设计、结构和命名约定的所有决策。SQLAlChemy仅提供自动执行这些决策的方法
  • 使用SQLAlChemy,不存在“ORM生成错误的查询”这样的事情-您保留对查询结构的完全控制,包括如何组织联接、如何使用子查询和关联、请求哪些列。SQLAlChemy所做的一切最终都是开发人员发起的决策的结果
  • 如果问题不需要ORM,请不要使用ORM。SQLAlChemy由一个核心和单独的ORM组件组成。Core提供了完整的SQL表达式语言,允许以Pythonic方式构造SQL构造,这些构造直接呈现为目标数据库的SQL字符串,返回本质上是增强型DBAPI游标的结果集
  • 交易应该是常态。使用SQLAlChemy的ORM,在调用Commit()之前,不会将任何内容放到永久存储中。SQLAlChemy鼓励应用程序创建描述一系列操作的开始和结束的一致方法
  • 千万不要在SQL语句中呈现文字值。最大程度地使用了绑定参数,从而允许查询优化器有效地缓存查询计划,并使SQL注入攻击不再成为问题

文档

最新文档位于:

https://www.sqlalchemy.org/docs/

安装/要求

有关安装的完整文档,请访问Installation

获取帮助/开发/错误报告

请参阅SQLAlchemy Community Guide

行为规范

最重要的是,SQLAlChemy非常重视用户和开发人员之间的礼貌、深思熟虑和建设性的交流。请参阅我们当前的行为准则,网址为Code of Conduct

许可证

SQLAlChemy分布在MIT license