问题:如何将SqlAlchemy结果序列化为JSON?
Django有一些很好的自动ORM模型从数据库返回到JSON格式的自动序列化。
如何将SQLAlchemy查询结果序列化为JSON格式?
我试过了,jsonpickle.encode
但是它编码查询对象本身。我试过了json.dumps(items)
但是回来了
TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable
将SQLAlchemy ORM对象序列化为JSON / XML真的很难吗?没有默认的序列化程序吗?如今,序列化ORM查询结果是非常常见的任务。
我需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示形式。
javascript datagird(JQGrid http://www.trirand.com/blog/)中需要使用JSON / XML格式的SQLAlchemy对象查询结果
回答 0
平面实施
您可以使用如下形式:
from sqlalchemy.ext.declarative import DeclarativeMeta
class AlchemyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
# an SQLAlchemy class
fields = {}
for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
data = obj.__getattribute__(field)
try:
json.dumps(data) # this will fail on non-encodable values, like other classes
fields[field] = data
except TypeError:
fields[field] = None
# a json-encodable dict
return fields
return json.JSONEncoder.default(self, obj)
然后使用以下命令转换为JSON:
c = YourAlchemyClass()
print json.dumps(c, cls=AlchemyEncoder)
它将忽略不可编码的字段(将它们设置为“无”)。
它不会自动扩展关系(因为这可能导致自我引用,并永远循环)。
递归的非循环实现
但是,如果您希望永远循环,则可以使用:
from sqlalchemy.ext.declarative import DeclarativeMeta
def new_alchemy_encoder():
_visited_objs = []
class AlchemyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
# don't re-visit self
if obj in _visited_objs:
return None
_visited_objs.append(obj)
# an SQLAlchemy class
fields = {}
for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
fields[field] = obj.__getattribute__(field)
# a json-encodable dict
return fields
return json.JSONEncoder.default(self, obj)
return AlchemyEncoder
然后使用以下代码编码对象:
print json.dumps(e, cls=new_alchemy_encoder(), check_circular=False)
这将对所有子项,所有子项以及所有子项进行编码。基本上,可能对整个数据库进行编码。当它到达之前已编码的内容时,会将其编码为“无”。
递归(可能是循环的)选择性实现
另一种可能更好的选择是能够指定要扩展的字段:
def new_alchemy_encoder(revisit_self = False, fields_to_expand = []):
_visited_objs = []
class AlchemyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
# don't re-visit self
if revisit_self:
if obj in _visited_objs:
return None
_visited_objs.append(obj)
# go through each field in this SQLalchemy class
fields = {}
for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
val = obj.__getattribute__(field)
# is this field another SQLalchemy object, or a list of SQLalchemy objects?
if isinstance(val.__class__, DeclarativeMeta) or (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
# unless we're expanding this field, stop here
if field not in fields_to_expand:
# not expanding this field: set it to None and continue
fields[field] = None
continue
fields[field] = val
# a json-encodable dict
return fields
return json.JSONEncoder.default(self, obj)
return AlchemyEncoder
您现在可以通过以下方式调用它:
print json.dumps(e, cls=new_alchemy_encoder(False, ['parents']), check_circular=False)
例如,仅扩展名为“父母”的SQLAlchemy字段。
回答 1
您可以将对象输出为字典:
class User:
def as_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}
然后你用 User.as_dict()
序列化对象。
回答 2
您可以将RowProxy转换为这样的字典:
d = dict(row.items())
然后将其序列化为JSON(您将必须为datetime
值之类的东西指定编码器),如果您只想要一个记录(而不是相关记录的完整层次结构),这并不难。
json.dumps([(dict(row.items())) for row in rs])
回答 3
我建议使用棉花糖。它允许您创建序列化器来表示模型实例,并支持关系和嵌套对象。
这是他们文档中的截断示例。采取ORM模型Author
:
class Author(db.Model):
id = db.Column(db.Integer, primary_key=True)
first = db.Column(db.String(80))
last = db.Column(db.String(80))
该类的棉花糖架构如下所示:
class AuthorSchema(Schema):
id = fields.Int(dump_only=True)
first = fields.Str()
last = fields.Str()
formatted_name = fields.Method("format_name", dump_only=True)
def format_name(self, author):
return "{}, {}".format(author.last, author.first)
…并像这样使用:
author_schema = AuthorSchema()
author_schema.dump(Author.query.first())
…将产生如下输出:
{
"first": "Tim",
"formatted_name": "Peters, Tim",
"id": 1,
"last": "Peters"
}
看看他们完整的Flask-SQLAlchemy示例。
一个名为的库marshmallow-sqlalchemy
专门集成了SQLAlchemy和棉花糖。在该库中,上述Author
模型的架构如下所示:
class AuthorSchema(ModelSchema):
class Meta:
model = Author
集成允许从SQLAlchemy Column
类型推断字段类型。
回答 4
Python的3.7+和瓶1.1+可以使用内置的数据类包
from dataclasses import dataclass
from datetime import datetime
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
db = SQLAlchemy(app)
@dataclass
class User(db.Model):
id: int
email: str
id = db.Column(db.Integer, primary_key=True, auto_increment=True)
email = db.Column(db.String(200), unique=True)
@app.route('/users/')
def users():
users = User.query.all()
return jsonify(users)
if __name__ == "__main__":
users = User(email="user1@gmail.com"), User(email="user2@gmail.com")
db.create_all()
db.session.add_all(users)
db.session.commit()
app.run()
/users/
现在,该路线将返回用户列表。
[
{"email": "user1@gmail.com", "id": 1},
{"email": "user2@gmail.com", "id": 2}
]
自动序列化相关模型
@dataclass
class Account(db.Model):
id: int
users: User
id = db.Column(db.Integer)
users = db.relationship(User) # User model would need a db.ForeignKey field
来自的回应jsonify(account)
就是这样。
{
"id":1,
"users":[
{
"email":"user1@gmail.com",
"id":1
},
{
"email":"user2@gmail.com",
"id":2
}
]
}
覆盖默认的JSON编码器
from flask.json import JSONEncoder
class CustomJSONEncoder(JSONEncoder):
"Add support for serializing timedeltas"
def default(o):
if type(o) == datetime.timedelta:
return str(o)
elif type(o) == datetime.datetime:
return o.isoformat()
else:
return super().default(o)
app.json_encoder = CustomJSONEncoder
回答 5
烧瓶JsonTools包装具有实现JsonSerializableBase为您的模型提供 Base类的实现。
用法:
from sqlalchemy.ext.declarative import declarative_base
from flask.ext.jsontools import JsonSerializableBase
Base = declarative_base(cls=(JsonSerializableBase,))
class User(Base):
#...
现在 User
模型可以神奇地序列化了。
如果您的框架不是Flask,则只需获取代码
回答 6
出于安全原因,您永远不要返回模型的所有字段。我喜欢有选择地选择它们。
现在编码瓶的JSON支持UUID,日期时间和关系数据(以及添加query
和query_class
对flask_sqlalchemy db.Model
类)。我已更新编码器,如下所示:
app / json_encoder.py
from sqlalchemy.ext.declarative import DeclarativeMeta
from flask import json
class AlchemyEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o.__class__, DeclarativeMeta):
data = {}
fields = o.__json__() if hasattr(o, '__json__') else dir(o)
for field in [f for f in fields if not f.startswith('_') and f not in ['metadata', 'query', 'query_class']]:
value = o.__getattribute__(field)
try:
json.dumps(value)
data[field] = value
except TypeError:
data[field] = None
return data
return json.JSONEncoder.default(self, o)
app/__init__.py
# json encoding
from app.json_encoder import AlchemyEncoder
app.json_encoder = AlchemyEncoder
这样,我可以选择添加一个__json__
属性,该属性返回我想编码的字段列表:
app/models.py
class Queue(db.Model):
id = db.Column(db.Integer, primary_key=True)
song_id = db.Column(db.Integer, db.ForeignKey('song.id'), unique=True, nullable=False)
song = db.relationship('Song', lazy='joined')
type = db.Column(db.String(20), server_default=u'audio/mpeg')
src = db.Column(db.String(255), nullable=False)
created_at = db.Column(db.DateTime, server_default=db.func.now())
updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())
def __init__(self, song):
self.song = song
self.src = song.full_path
def __json__(self):
return ['song', 'src', 'type', 'created_at']
我将@jsonapi添加到视图中,返回结果列表,然后输出如下:
[
{
"created_at": "Thu, 23 Jul 2015 11:36:53 GMT",
"song":
{
"full_path": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
"id": 2,
"path_name": "Audioslave/Audioslave [2002]/1 Cochise.mp3"
},
"src": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
"type": "audio/mpeg"
}
]
回答 7
您可以这样使用SqlAlchemy的自省:
mysql = SQLAlchemy()
from sqlalchemy import inspect
class Contacts(mysql.Model):
__tablename__ = 'CONTACTS'
id = mysql.Column(mysql.Integer, primary_key=True)
first_name = mysql.Column(mysql.String(128), nullable=False)
last_name = mysql.Column(mysql.String(128), nullable=False)
phone = mysql.Column(mysql.String(128), nullable=False)
email = mysql.Column(mysql.String(128), nullable=False)
street = mysql.Column(mysql.String(128), nullable=False)
zip_code = mysql.Column(mysql.String(128), nullable=False)
city = mysql.Column(mysql.String(128), nullable=False)
def toDict(self):
return { c.key: getattr(self, c.key) for c in inspect(self).mapper.column_attrs }
@app.route('/contacts',methods=['GET'])
def getContacts():
contacts = Contacts.query.all()
contactsArr = []
for contact in contacts:
contactsArr.append(contact.toDict())
return jsonify(contactsArr)
@app.route('/contacts/<int:id>',methods=['GET'])
def getContact(id):
contact = Contacts.query.get(id)
return jsonify(contact.toDict())
从这里的答案中获得启发: 将sqlalchemy行对象转换为python dict
回答 8
更详细的解释。在模型中,添加:
def as_dict(self):
return {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}
这str()
是针对python 3的,因此如果使用python 2则使用unicode()
。它应该有助于反序列化日期。如果不处理这些问题,则可以将其删除。
您现在可以像这样查询数据库
some_result = User.query.filter_by(id=current_user.id).first().as_dict()
First()
需要避免奇怪的错误。as_dict()
现在将反序列化结果。反序列化后,可以将其转换为json
jsonify(some_result)
回答 9
它不是那么简单。我写了一些代码来做到这一点。我仍在努力,它使用MochiKit框架。它基本上使用代理和已注册的JSON转换器在Python和Javascript之间转换复合对象。
数据库对象的浏览器端是db.js 它需要在基本的Python代理源的proxy.js。
在Python方面,有基本的代理模块。最后是webserver.py中的SqlAlchemy对象编码器。它还取决于在models.py文件中找到的元数据提取器。
回答 10
虽然最初的问题可以回溯一会儿,但这里的答案(以及我的经验)表明,这是一个不平凡的问题,它具有许多不同的方法,并具有不同的权衡取舍。
这就是为什么我构建SQLAthanor库,库扩展了SQLAlchemy的声明性ORM,并提供了您可能想看的可配置序列化/反序列化支持。
该库支持:
- Python 2.7、3.4、3.5和3.6。
- SQLAlchemy 0.9版或更高版本
- 到JSON,CSV,YAML和Python的序列化/反序列化
dict
- 列/属性,关系,混合属性和关联代理的序列化/反序列化
- 为特定格式和列/关系/属性启用和禁用序列化(例如,您要支持入站
password
值,但不要包括出站值) - 序列化前和序列化后的值处理(用于验证或强制类型转换)
- 非常简单的语法,既Python风格又与SQLAlchemy自己的方法无缝地保持一致
您可以在此处查看(我希望!)全面的文档: https //sqlathanor.readthedocs.io/en/latest
希望这可以帮助!
回答 11
自定义序列化和反序列化。
“ from_json”(类方法)基于json数据构建Model对象。
“反序列化”只能在实例上调用,并将json中的所有数据合并到Model实例中。
“序列化” -递归序列化
需要__write_only__属性来定义只写属性(例如“ password_hash”)。
class Serializable(object):
__exclude__ = ('id',)
__include__ = ()
__write_only__ = ()
@classmethod
def from_json(cls, json, selfObj=None):
if selfObj is None:
self = cls()
else:
self = selfObj
exclude = (cls.__exclude__ or ()) + Serializable.__exclude__
include = cls.__include__ or ()
if json:
for prop, value in json.iteritems():
# ignore all non user data, e.g. only
if (not (prop in exclude) | (prop in include)) and isinstance(
getattr(cls, prop, None), QueryableAttribute):
setattr(self, prop, value)
return self
def deserialize(self, json):
if not json:
return None
return self.__class__.from_json(json, selfObj=self)
@classmethod
def serialize_list(cls, object_list=[]):
output = []
for li in object_list:
if isinstance(li, Serializable):
output.append(li.serialize())
else:
output.append(li)
return output
def serialize(self, **kwargs):
# init write only props
if len(getattr(self.__class__, '__write_only__', ())) == 0:
self.__class__.__write_only__ = ()
dictionary = {}
expand = kwargs.get('expand', ()) or ()
prop = 'props'
if expand:
# expand all the fields
for key in expand:
getattr(self, key)
iterable = self.__dict__.items()
is_custom_property_set = False
# include only properties passed as parameter
if (prop in kwargs) and (kwargs.get(prop, None) is not None):
is_custom_property_set = True
iterable = kwargs.get(prop, None)
# loop trough all accessible properties
for key in iterable:
accessor = key
if isinstance(key, tuple):
accessor = key[0]
if not (accessor in self.__class__.__write_only__) and not accessor.startswith('_'):
# force select from db to be able get relationships
if is_custom_property_set:
getattr(self, accessor, None)
if isinstance(self.__dict__.get(accessor), list):
dictionary[accessor] = self.__class__.serialize_list(object_list=self.__dict__.get(accessor))
# check if those properties are read only
elif isinstance(self.__dict__.get(accessor), Serializable):
dictionary[accessor] = self.__dict__.get(accessor).serialize()
else:
dictionary[accessor] = self.__dict__.get(accessor)
return dictionary
回答 12
这是一个解决方案,可让您选择想要包含在输出中的关系。注意:这是一个完整的重写,将dict / str作为arg而不是列表。修复一些东西。
def deep_dict(self, relations={}):
"""Output a dict of an SA object recursing as deep as you want.
Takes one argument, relations which is a dictionary of relations we'd
like to pull out. The relations dict items can be a single relation
name or deeper relation names connected by sub dicts
Example:
Say we have a Person object with a family relationship
person.deep_dict(relations={'family':None})
Say the family object has homes as a relation then we can do
person.deep_dict(relations={'family':{'homes':None}})
OR
person.deep_dict(relations={'family':'homes'})
Say homes has a relation like rooms you can do
person.deep_dict(relations={'family':{'homes':'rooms'}})
and so on...
"""
mydict = dict((c, str(a)) for c, a in
self.__dict__.items() if c != '_sa_instance_state')
if not relations:
# just return ourselves
return mydict
# otherwise we need to go deeper
if not isinstance(relations, dict) and not isinstance(relations, str):
raise Exception("relations should be a dict, it is of type {}".format(type(relations)))
# got here so check and handle if we were passed a dict
if isinstance(relations, dict):
# we were passed deeper info
for left, right in relations.items():
myrel = getattr(self, left)
if isinstance(myrel, list):
mydict[left] = [rel.deep_dict(relations=right) for rel in myrel]
else:
mydict[left] = myrel.deep_dict(relations=right)
# if we get here check and handle if we were passed a string
elif isinstance(relations, str):
# passed a single item
myrel = getattr(self, relations)
left = relations
if isinstance(myrel, list):
mydict[left] = [rel.deep_dict(relations=None)
for rel in myrel]
else:
mydict[left] = myrel.deep_dict(relations=None)
return mydict
因此,例如使用人员/家庭/房屋/房间的示例…将其转换为json,您所需要做的就是
json.dumps(person.deep_dict(relations={'family':{'homes':'rooms'}}))
回答 13
def alc2json(row):
return dict([(col, str(getattr(row,col))) for col in row.__table__.columns.keys()])
我以为我会和这个打一点代码高尔夫。
仅供参考:我正在使用automap_base因为我们有根据业务需求单独设计的架构。我今天才刚开始使用SQLAlchemy,但是文档指出automap_base是对clarativeative_base的扩展,这似乎是SQLAlchemy ORM中的典型范例,因此我认为这应该可行。
按照Tjorriemorrie的解决方案,使用后跟的外键并不太花哨,但是它只是将列与值匹配,并通过str()-列的值来处理Python类型。我们的值包括Python datetime.time和decimal.Decimal类类型的结果,因此可以完成工作。
希望这对任何路人都有帮助!
回答 14
我知道这是一个比较老的帖子。我接受了@SashaB提供的解决方案,并根据需要进行了修改。
我添加了以下内容:
- 字段忽略列表:序列化时要忽略的字段列表
- 字段替换列表:字典,其中包含要在序列化时用值替换的字段名称。
- 删除了方法并使BaseQuery序列化
我的代码如下:
def alchemy_json_encoder(revisit_self = False, fields_to_expand = [], fields_to_ignore = [], fields_to_replace = {}):
"""
Serialize SQLAlchemy result into JSon
:param revisit_self: True / False
:param fields_to_expand: Fields which are to be expanded for including their children and all
:param fields_to_ignore: Fields to be ignored while encoding
:param fields_to_replace: Field keys to be replaced by values assigned in dictionary
:return: Json serialized SQLAlchemy object
"""
_visited_objs = []
class AlchemyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
# don't re-visit self
if revisit_self:
if obj in _visited_objs:
return None
_visited_objs.append(obj)
# go through each field in this SQLalchemy class
fields = {}
for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata' and x not in fields_to_ignore]:
val = obj.__getattribute__(field)
# is this field method defination, or an SQLalchemy object
if not hasattr(val, "__call__") and not isinstance(val, BaseQuery):
field_name = fields_to_replace[field] if field in fields_to_replace else field
# is this field another SQLalchemy object, or a list of SQLalchemy objects?
if isinstance(val.__class__, DeclarativeMeta) or \
(isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
# unless we're expanding this field, stop here
if field not in fields_to_expand:
# not expanding this field: set it to None and continue
fields[field_name] = None
continue
fields[field_name] = val
# a json-encodable dict
return fields
return json.JSONEncoder.default(self, obj)
return AlchemyEncoder
希望它能对某人有所帮助!
回答 15
使用SQLAlchemy中的内置序列化器:
from sqlalchemy.ext.serializer import loads, dumps
obj = MyAlchemyObject()
# serialize object
serialized_obj = dumps(obj)
# deserialize object
obj = loads(serialized_obj)
如果要在会话之间转移对象,请记住使用将该对象与当前会话分离session.expunge(obj)
。要再次附加它,只需执行即可session.add(obj)
。
回答 16
以下代码会将sqlalchemy结果序列化为json。
import json
from collections import OrderedDict
def asdict(self):
result = OrderedDict()
for key in self.__mapper__.c.keys():
if getattr(self, key) is not None:
result[key] = str(getattr(self, key))
else:
result[key] = getattr(self, key)
return result
def to_array(all_vendors):
v = [ ven.asdict() for ven in all_vendors ]
return json.dumps(v)
叫乐,
def all_products():
all_products = Products.query.all()
return to_array(all_products)
回答 17
AlchemyEncoder很棒,但有时会失败,并使用十进制值。这是解决小数点问题的改进编码器-
class AlchemyEncoder(json.JSONEncoder):
# To serialize SQLalchemy objects
def default(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
model_fields = {}
for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
data = obj.__getattribute__(field)
print data
try:
json.dumps(data) # this will fail on non-encodable values, like other classes
model_fields[field] = data
except TypeError:
model_fields[field] = None
return model_fields
if isinstance(obj, Decimal):
return float(obj)
return json.JSONEncoder.default(self, obj)
回答 18
当使用sqlalchemy连接到数据库时,这是一个高度可配置的简单解决方案。使用大熊猫。
import pandas as pd
import sqlalchemy
#sqlalchemy engine configuration
engine = sqlalchemy.create_engine....
def my_function():
#read in from sql directly into a pandas dataframe
#check the pandas documentation for additional config options
sql_DF = pd.read_sql_table("table_name", con=engine)
# "orient" is optional here but allows you to specify the json formatting you require
sql_json = sql_DF.to_json(orient="index")
return sql_json
回答 19
在Flask下,这可以工作并处理datatime字段,将类型的字段转换'time': datetime.datetime(2018, 3, 22, 15, 40)
为"time": "2018-03-22 15:40:00"
:
obj = {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}
# This to get the JSON body
return json.dumps(obj)
# Or this to get a response object
return jsonify(obj)
回答 20
带有utf-8的内置串行器扼流圈无法解码某些输入的无效起始字节。相反,我去了:
def row_to_dict(row):
temp = row.__dict__
temp.pop('_sa_instance_state', None)
return temp
def rows_to_list(rows):
ret_rows = []
for row in rows:
ret_rows.append(row_to_dict(row))
return ret_rows
@website_blueprint.route('/api/v1/some/endpoint', methods=['GET'])
def some_api():
'''
/some_endpoint
'''
rows = rows_to_list(SomeModel.query.all())
response = app.response_class(
response=jsonplus.dumps(rows),
status=200,
mimetype='application/json'
)
return response
回答 21
也许您可以使用这样的类
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy import Table
class Custom:
"""Some custom logic here!"""
__table__: Table # def for mypy
@declared_attr
def __tablename__(cls): # pylint: disable=no-self-argument
return cls.__name__ # pylint: disable= no-member
def to_dict(self) -> Dict[str, Any]:
"""Serializes only column data."""
return {c.name: getattr(self, c.name) for c in self.__table__.columns}
Base = declarative_base(cls=Custom)
class MyOwnTable(Base):
#COLUMNS!
这样所有对象都有to_dict
方法
回答 22
在使用一些原始sql和未定义的对象时,使用cursor.description
似乎可以得到我想要的东西:
with connection.cursor() as cur:
print(query)
cur.execute(query)
for item in cur.fetchall():
row = {column.name: item[i] for i, column in enumerate(cur.description)}
print(row)
回答 23
step1:
class CNAME:
...
def as_dict(self):
return {item.name: getattr(self, item.name) for item in self.__table__.columns}
step2:
list = []
for data in session.query(CNAME).all():
list.append(data.as_dict())
step3:
return jsonify(list)
回答 24
我使用(太多?)词典的观点:
def serialize(_query):
#d = dictionary written to per row
#D = dictionary d is written to each time, then reset
#Master = dictionary of dictionaries; the id Key (int, unique from database)
from D is used as the Key for the dictionary D entry in Master
Master = {}
D = {}
x = 0
for u in _query:
d = u.__dict__
D = {}
for n in d.keys():
if n != '_sa_instance_state':
D[n] = d[n]
x = d['id']
Master[x] = D
return Master
与flask(包括jsonify)和flask_sqlalchemy一起运行,以将输出打印为JSON。
使用jsonify(serialize())调用该函数。
适用于到目前为止我尝试过的所有SQLAlchemy查询(运行SQLite3)