如何将SqlAlchemy结果序列化为JSON?

问题:如何将SqlAlchemy结果序列化为JSON?

Django有一些很好的自动ORM模型从数据库返回到JSON格式的自动序列化。

如何将SQLAlchemy查询结果序列化为JSON格式?

我试过了,jsonpickle.encode但是它编码查询对象本身。我试过了json.dumps(items)但是回来了

TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable

将SQLAlchemy ORM对象序列化为JSON / XML真的很难吗?没有默认的序列化程序吗?如今,序列化ORM查询结果是非常常见的任务。

我需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示形式。

javascript datagird(JQGrid http://www.trirand.com/blog/)中需要使用JSON / XML格式的SQLAlchemy对象查询结果

Django has some good automatic serialization of ORM models returned from DB to JSON format.

How to serialize SQLAlchemy query result to JSON format?

I tried jsonpickle.encode but it encodes query object itself. I tried json.dumps(items) but it returns

TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable

Is it really so hard to serialize SQLAlchemy ORM objects to JSON /XML? Isn’t there any default serializer for it? It’s very common task to serialize ORM query results nowadays.

What I need is just to return JSON or XML data representation of SQLAlchemy query result.

SQLAlchemy objects query result in JSON/XML format is needed to be used in javascript datagird (JQGrid http://www.trirand.com/blog/)


回答 0

平面实施

您可以使用如下形式:

from sqlalchemy.ext.declarative import DeclarativeMeta

class AlchemyEncoder(json.JSONEncoder):

    def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            # an SQLAlchemy class
            fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                data = obj.__getattribute__(field)
                try:
                    json.dumps(data) # this will fail on non-encodable values, like other classes
                    fields[field] = data
                except TypeError:
                    fields[field] = None
            # a json-encodable dict
            return fields

        return json.JSONEncoder.default(self, obj)

然后使用以下命令转换为JSON:

c = YourAlchemyClass()
print json.dumps(c, cls=AlchemyEncoder)

它将忽略不可编码的字段(将它们设置为“无”)。

它不会自动扩展关系(因为这可能导致自我引用,并永远循环)。

递归的非循环实现

但是,如果您希望永远循环,则可以使用:

from sqlalchemy.ext.declarative import DeclarativeMeta

def new_alchemy_encoder():
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if obj in _visited_objs:
                    return None
                _visited_objs.append(obj)

                # an SQLAlchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    fields[field] = obj.__getattribute__(field)
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

然后使用以下代码编码对象:

print json.dumps(e, cls=new_alchemy_encoder(), check_circular=False)

这将对所有子项,所有子项以及所有子项进行编码。基本上,可能对整个数据库进行编码。当它到达之前已编码的内容时,会将其编码为“无”。

递归(可能是循环的)选择性实现

另一种可能更好的选择是能够指定要扩展的字段:

def new_alchemy_encoder(revisit_self = False, fields_to_expand = []):
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if revisit_self:
                    if obj in _visited_objs:
                        return None
                    _visited_objs.append(obj)

                # go through each field in this SQLalchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    val = obj.__getattribute__(field)

                    # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                    if isinstance(val.__class__, DeclarativeMeta) or (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                        # unless we're expanding this field, stop here
                        if field not in fields_to_expand:
                            # not expanding this field: set it to None and continue
                            fields[field] = None
                            continue

                    fields[field] = val
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

您现在可以通过以下方式调用它:

print json.dumps(e, cls=new_alchemy_encoder(False, ['parents']), check_circular=False)

例如,仅扩展名为“父母”的SQLAlchemy字段。

A flat implementation

You could use something like this:

from sqlalchemy.ext.declarative import DeclarativeMeta

class AlchemyEncoder(json.JSONEncoder):

    def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            # an SQLAlchemy class
            fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                data = obj.__getattribute__(field)
                try:
                    json.dumps(data) # this will fail on non-encodable values, like other classes
                    fields[field] = data
                except TypeError:
                    fields[field] = None
            # a json-encodable dict
            return fields

        return json.JSONEncoder.default(self, obj)

and then convert to JSON using:

c = YourAlchemyClass()
print json.dumps(c, cls=AlchemyEncoder)

It will ignore fields that are not encodable (set them to ‘None’).

It doesn’t auto-expand relations (since this could lead to self-references, and loop forever).

A recursive, non-circular implementation

If, however, you’d rather loop forever, you could use:

from sqlalchemy.ext.declarative import DeclarativeMeta

def new_alchemy_encoder():
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if obj in _visited_objs:
                    return None
                _visited_objs.append(obj)

                # an SQLAlchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    fields[field] = obj.__getattribute__(field)
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

And then encode objects using:

print json.dumps(e, cls=new_alchemy_encoder(), check_circular=False)

This would encode all children, and all their children, and all their children… Potentially encode your entire database, basically. When it reaches something its encoded before, it will encode it as ‘None’.

A recursive, possibly-circular, selective implementation

Another alternative, probably better, is to be able to specify the fields you want to expand:

def new_alchemy_encoder(revisit_self = False, fields_to_expand = []):
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if revisit_self:
                    if obj in _visited_objs:
                        return None
                    _visited_objs.append(obj)

                # go through each field in this SQLalchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    val = obj.__getattribute__(field)

                    # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                    if isinstance(val.__class__, DeclarativeMeta) or (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                        # unless we're expanding this field, stop here
                        if field not in fields_to_expand:
                            # not expanding this field: set it to None and continue
                            fields[field] = None
                            continue

                    fields[field] = val
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

You can now call it with:

print json.dumps(e, cls=new_alchemy_encoder(False, ['parents']), check_circular=False)

To only expand SQLAlchemy fields called ‘parents’, for example.


回答 1

您可以将对象输出为字典:

class User:
   def as_dict(self):
       return {c.name: getattr(self, c.name) for c in self.__table__.columns}

然后你用 User.as_dict()序列化对象。

将sqlalchemy行对象转换为python dict中所述

You could just output your object as a dictionary:

class User:
   def as_dict(self):
       return {c.name: getattr(self, c.name) for c in self.__table__.columns}

And then you use User.as_dict() to serialize your object.

As explained in Convert sqlalchemy row object to python dict


回答 2

您可以将RowProxy转换为这样的字典:

 d = dict(row.items())

然后将其序列化为JSON(您将必须为datetime值之类的东西指定编码器),如果您只想要一个记录(而不是相关记录的完整层次结构),这并不难。

json.dumps([(dict(row.items())) for row in rs])

You can convert a RowProxy to a dict like this:

 d = dict(row.items())

Then serialize that to JSON ( you will have to specify an encoder for things like datetime values ) It’s not that hard if you just want one record ( and not a full hierarchy of related records ).

json.dumps([(dict(row.items())) for row in rs])

回答 3

我建议使用棉花糖。它允许您创建序列化器来表示模型实例,并支持关系和嵌套对象。

这是他们文档中的截断示例。采取ORM模型Author

class Author(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    first = db.Column(db.String(80))
    last = db.Column(db.String(80))

该类的棉花糖架构如下所示:

class AuthorSchema(Schema):
    id = fields.Int(dump_only=True)
    first = fields.Str()
    last = fields.Str()
    formatted_name = fields.Method("format_name", dump_only=True)

    def format_name(self, author):
        return "{}, {}".format(author.last, author.first)

…并像这样使用:

author_schema = AuthorSchema()
author_schema.dump(Author.query.first())

…将产生如下输出:

{
        "first": "Tim",
        "formatted_name": "Peters, Tim",
        "id": 1,
        "last": "Peters"
}

看看他们完整的Flask-SQLAlchemy示例

一个名为的库marshmallow-sqlalchemy专门集成了SQLAlchemy和棉花糖。在该库中,上述Author模型的架构如下所示:

class AuthorSchema(ModelSchema):
    class Meta:
        model = Author

集成允许从SQLAlchemy Column类型推断字段类型。

棉花糖-sqlalchemy在这里。

I recommend using marshmallow. It allows you to create serializers to represent your model instances with support to relations and nested objects.

Here is a truncated example from their docs. Take the ORM model, Author:

class Author(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    first = db.Column(db.String(80))
    last = db.Column(db.String(80))

A marshmallow schema for that class is constructed like this:

class AuthorSchema(Schema):
    id = fields.Int(dump_only=True)
    first = fields.Str()
    last = fields.Str()
    formatted_name = fields.Method("format_name", dump_only=True)

    def format_name(self, author):
        return "{}, {}".format(author.last, author.first)

…and used like this:

author_schema = AuthorSchema()
author_schema.dump(Author.query.first())

…would produce an output like this:

{
        "first": "Tim",
        "formatted_name": "Peters, Tim",
        "id": 1,
        "last": "Peters"
}

Have a look at their full Flask-SQLAlchemy Example.

A library called marshmallow-sqlalchemy specifically integrates SQLAlchemy and marshmallow. In that library, the schema for the Author model described above looks like this:

class AuthorSchema(ModelSchema):
    class Meta:
        model = Author

The integration allows the field types to be inferred from the SQLAlchemy Column types.

marshmallow-sqlalchemy here.


回答 4

Python的3.7+和瓶1.1+可以使用内置的数据类

from dataclasses import dataclass
from datetime import datetime
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy


app = Flask(__name__)
db = SQLAlchemy(app)


@dataclass
class User(db.Model):
  id: int
  email: str

  id = db.Column(db.Integer, primary_key=True, auto_increment=True)
  email = db.Column(db.String(200), unique=True)


@app.route('/users/')
def users():
  users = User.query.all()
  return jsonify(users)  


if __name__ == "__main__":
  users = User(email="user1@gmail.com"), User(email="user2@gmail.com")
  db.create_all()
  db.session.add_all(users)
  db.session.commit()
  app.run()

/users/现在,该路线将返回用户列表。

[
  {"email": "user1@gmail.com", "id": 1},
  {"email": "user2@gmail.com", "id": 2}
]

自动序列化相关模型

@dataclass
class Account(db.Model):
  id: int
  users: User

  id = db.Column(db.Integer)
  users = db.relationship(User)  # User model would need a db.ForeignKey field

来自的回应jsonify(account)就是这样。

{  
   "id":1,
   "users":[  
      {  
         "email":"user1@gmail.com",
         "id":1
      },
      {  
         "email":"user2@gmail.com",
         "id":2
      }
   ]
}

覆盖默认的JSON编码器

from flask.json import JSONEncoder


class CustomJSONEncoder(JSONEncoder):
  "Add support for serializing timedeltas"

  def default(o):
    if type(o) == datetime.timedelta:
      return str(o)
    elif type(o) == datetime.datetime:
      return o.isoformat()
    else:
      return super().default(o)

app.json_encoder = CustomJSONEncoder      

Python 3.7+ and Flask 1.1+ can use the built-in dataclasses package

from dataclasses import dataclass
from datetime import datetime
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy


app = Flask(__name__)
db = SQLAlchemy(app)


@dataclass
class User(db.Model):
  id: int
  email: str

  id = db.Column(db.Integer, primary_key=True, auto_increment=True)
  email = db.Column(db.String(200), unique=True)


@app.route('/users/')
def users():
  users = User.query.all()
  return jsonify(users)  


if __name__ == "__main__":
  users = User(email="user1@gmail.com"), User(email="user2@gmail.com")
  db.create_all()
  db.session.add_all(users)
  db.session.commit()
  app.run()

The /users/ route will now return a list of users.

[
  {"email": "user1@gmail.com", "id": 1},
  {"email": "user2@gmail.com", "id": 2}
]

Auto-serialize related models

@dataclass
class Account(db.Model):
  id: int
  users: User

  id = db.Column(db.Integer)
  users = db.relationship(User)  # User model would need a db.ForeignKey field

The response from jsonify(account) would be this.

{  
   "id":1,
   "users":[  
      {  
         "email":"user1@gmail.com",
         "id":1
      },
      {  
         "email":"user2@gmail.com",
         "id":2
      }
   ]
}

Overwrite the default JSON Encoder

from flask.json import JSONEncoder


class CustomJSONEncoder(JSONEncoder):
  "Add support for serializing timedeltas"

  def default(o):
    if type(o) == datetime.timedelta:
      return str(o)
    elif type(o) == datetime.datetime:
      return o.isoformat()
    else:
      return super().default(o)

app.json_encoder = CustomJSONEncoder      

回答 5

烧瓶JsonTools包装具有实现JsonSerializableBase为您的模型提供 Base类的实现。

用法:

from sqlalchemy.ext.declarative import declarative_base
from flask.ext.jsontools import JsonSerializableBase

Base = declarative_base(cls=(JsonSerializableBase,))

class User(Base):
    #...

现在 User模型可以神奇地序列化了。

如果您的框架不是Flask,则只需获取代码

Flask-JsonTools package has an implementation of JsonSerializableBase Base class for your models.

Usage:

from sqlalchemy.ext.declarative import declarative_base
from flask.ext.jsontools import JsonSerializableBase

Base = declarative_base(cls=(JsonSerializableBase,))

class User(Base):
    #...

Now the User model is magically serializable.

If your framework is not Flask, you can just grab the code


回答 6

出于安全原因,您永远不要返回模型的所有字段。我喜欢有选择地选择它们。

现在编码瓶的JSON支持UUID,日期时间和关系数据(以及添加queryquery_class对flask_sqlalchemy db.Model类)。我已更新编码器,如下所示:

app / json_encoder.py

    from sqlalchemy.ext.declarative import DeclarativeMeta
    from flask import json


    class AlchemyEncoder(json.JSONEncoder):
        def default(self, o):
            if isinstance(o.__class__, DeclarativeMeta):
                data = {}
                fields = o.__json__() if hasattr(o, '__json__') else dir(o)
                for field in [f for f in fields if not f.startswith('_') and f not in ['metadata', 'query', 'query_class']]:
                    value = o.__getattribute__(field)
                    try:
                        json.dumps(value)
                        data[field] = value
                    except TypeError:
                        data[field] = None
                return data
            return json.JSONEncoder.default(self, o)

app/__init__.py

# json encoding
from app.json_encoder import AlchemyEncoder
app.json_encoder = AlchemyEncoder

这样,我可以选择添加一个__json__属性,该属性返回我想编码的字段列表:

app/models.py

class Queue(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    song_id = db.Column(db.Integer, db.ForeignKey('song.id'), unique=True, nullable=False)
    song = db.relationship('Song', lazy='joined')
    type = db.Column(db.String(20), server_default=u'audio/mpeg')
    src = db.Column(db.String(255), nullable=False)
    created_at = db.Column(db.DateTime, server_default=db.func.now())
    updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())

    def __init__(self, song):
        self.song = song
        self.src = song.full_path

    def __json__(self):
        return ['song', 'src', 'type', 'created_at']

我将@jsonapi添加到视图中,返回结果列表,然后输出如下:

[

{

    "created_at": "Thu, 23 Jul 2015 11:36:53 GMT",
    "song": 

        {
            "full_path": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
            "id": 2,
            "path_name": "Audioslave/Audioslave [2002]/1 Cochise.mp3"
        },
    "src": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
    "type": "audio/mpeg"
}

]

For security reasons you should never return all the model’s fields. I prefer to selectively choose them.

Flask’s json encoding now supports UUID, datetime and relationships (and added query and query_class for flask_sqlalchemy db.Model class). I’ve updated the encoder as follows:

app/json_encoder.py

    from sqlalchemy.ext.declarative import DeclarativeMeta
    from flask import json


    class AlchemyEncoder(json.JSONEncoder):
        def default(self, o):
            if isinstance(o.__class__, DeclarativeMeta):
                data = {}
                fields = o.__json__() if hasattr(o, '__json__') else dir(o)
                for field in [f for f in fields if not f.startswith('_') and f not in ['metadata', 'query', 'query_class']]:
                    value = o.__getattribute__(field)
                    try:
                        json.dumps(value)
                        data[field] = value
                    except TypeError:
                        data[field] = None
                return data
            return json.JSONEncoder.default(self, o)

app/__init__.py

# json encoding
from app.json_encoder import AlchemyEncoder
app.json_encoder = AlchemyEncoder

With this I can optionally add a __json__ property that returns the list of fields I wish to encode:

app/models.py

class Queue(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    song_id = db.Column(db.Integer, db.ForeignKey('song.id'), unique=True, nullable=False)
    song = db.relationship('Song', lazy='joined')
    type = db.Column(db.String(20), server_default=u'audio/mpeg')
    src = db.Column(db.String(255), nullable=False)
    created_at = db.Column(db.DateTime, server_default=db.func.now())
    updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())

    def __init__(self, song):
        self.song = song
        self.src = song.full_path

    def __json__(self):
        return ['song', 'src', 'type', 'created_at']

I add @jsonapi to my view, return the resultlist and then my output is as follows:

[

{

    "created_at": "Thu, 23 Jul 2015 11:36:53 GMT",
    "song": 

        {
            "full_path": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
            "id": 2,
            "path_name": "Audioslave/Audioslave [2002]/1 Cochise.mp3"
        },
    "src": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
    "type": "audio/mpeg"
}

]

回答 7

您可以这样使用SqlAlchemy的自省:

mysql = SQLAlchemy()
from sqlalchemy import inspect

class Contacts(mysql.Model):  
    __tablename__ = 'CONTACTS'
    id = mysql.Column(mysql.Integer, primary_key=True)
    first_name = mysql.Column(mysql.String(128), nullable=False)
    last_name = mysql.Column(mysql.String(128), nullable=False)
    phone = mysql.Column(mysql.String(128), nullable=False)
    email = mysql.Column(mysql.String(128), nullable=False)
    street = mysql.Column(mysql.String(128), nullable=False)
    zip_code = mysql.Column(mysql.String(128), nullable=False)
    city = mysql.Column(mysql.String(128), nullable=False)
    def toDict(self):
        return { c.key: getattr(self, c.key) for c in inspect(self).mapper.column_attrs }

@app.route('/contacts',methods=['GET'])
def getContacts():
    contacts = Contacts.query.all()
    contactsArr = []
    for contact in contacts:
        contactsArr.append(contact.toDict()) 
    return jsonify(contactsArr)

@app.route('/contacts/<int:id>',methods=['GET'])
def getContact(id):
    contact = Contacts.query.get(id)
    return jsonify(contact.toDict())

从这里的答案中获得启发: 将sqlalchemy行对象转换为python dict

You can use introspection of SqlAlchemy as this :

mysql = SQLAlchemy()
from sqlalchemy import inspect

class Contacts(mysql.Model):  
    __tablename__ = 'CONTACTS'
    id = mysql.Column(mysql.Integer, primary_key=True)
    first_name = mysql.Column(mysql.String(128), nullable=False)
    last_name = mysql.Column(mysql.String(128), nullable=False)
    phone = mysql.Column(mysql.String(128), nullable=False)
    email = mysql.Column(mysql.String(128), nullable=False)
    street = mysql.Column(mysql.String(128), nullable=False)
    zip_code = mysql.Column(mysql.String(128), nullable=False)
    city = mysql.Column(mysql.String(128), nullable=False)
    def toDict(self):
        return { c.key: getattr(self, c.key) for c in inspect(self).mapper.column_attrs }

@app.route('/contacts',methods=['GET'])
def getContacts():
    contacts = Contacts.query.all()
    contactsArr = []
    for contact in contacts:
        contactsArr.append(contact.toDict()) 
    return jsonify(contactsArr)

@app.route('/contacts/<int:id>',methods=['GET'])
def getContact(id):
    contact = Contacts.query.get(id)
    return jsonify(contact.toDict())

Get inspired from an answer here : Convert sqlalchemy row object to python dict


回答 8

更详细的解释。在模型中,添加:

def as_dict(self):
       return {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

str()是针对python 3的,因此如果使用python 2则使用unicode()。它应该有助于反序列化日期。如果不处理这些问题,则可以将其删除。

您现在可以像这样查询数据库

some_result = User.query.filter_by(id=current_user.id).first().as_dict()

First()需要避免奇怪的错误。as_dict()现在将反序列化结果。反序列化后,可以将其转换为json

jsonify(some_result)

A more detailed explanation. In your model, add:

def as_dict(self):
       return {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

The str() is for python 3 so if using python 2 use unicode(). It should help deserialize dates. You can remove it if not dealing with those.

You can now query the database like this

some_result = User.query.filter_by(id=current_user.id).first().as_dict()

First() is needed to avoid weird errors. as_dict() will now deserialize the result. After deserialization, it is ready to be turned to json

jsonify(some_result)

回答 9

它不是那么简单。我写了一些代码来做到这一点。我仍在努力,它使用MochiKit框架。它基本上使用代理和已注册的JSON转换器在Python和Javascript之间转换复合对象。

数据库对象的浏览器端是db.js 它需要在基本的Python代理源的proxy.js

在Python方面,有基本的代理模块。最后是webserver.py中的SqlAlchemy对象编码器。它还取决于在models.py文件中找到的元数据提取器。

It is not so straighforward. I wrote some code to do this. I’m still working on it, and it uses the MochiKit framework. It basically translates compound objects between Python and Javascript using a proxy and registered JSON converters.

Browser side for database objects is db.js It needs the basic Python proxy source in proxy.js.

On the Python side there is the base proxy module. Then finally the SqlAlchemy object encoder in webserver.py. It also depends on metadata extractors found in the models.py file.


回答 10

虽然最初的问题可以回溯一会儿,但这里的答案(以及我的经验)表明,这是一个不平凡的问题,它具有许多不同的方法,并具有不同的权衡取舍。

这就是为什么我构建SQLAthanor库,库扩展了SQLAlchemy的声明性ORM,并提供了您可能想看的可配置序列化/反序列化支持。

该库支持:

  • Python 2.7、3.4、3.5和3.6。
  • SQLAlchemy 0.9版或更高版本
  • 到JSON,CSV,YAML和Python的序列化/反序列化 dict
  • 列/属性,关系,混合属性和关联代理的序列化/反序列化
  • 为特定格式和列/关系/属性启用和禁用序列化(例如,您要支持入站 password值,但不要包括值)
  • 序列化前和序列化后的值处理(用于验证或强制类型转换)
  • 非常简单的语法,既Python风格又与SQLAlchemy自己的方法无缝地保持一致

您可以在此处查看(我希望!)全面的文档: https //sqlathanor.readthedocs.io/en/latest

希望这可以帮助!

While the original question goes back awhile, the number of answers here (and my own experiences) suggest it’s a non-trivial question with a lot of different approaches of varying complexity with different trade-offs.

That’s why I built the SQLAthanor library that extends SQLAlchemy’s declarative ORM with configurable serialization/de-serialization support that you might want to take a look at.

The library supports:

  • Python 2.7, 3.4, 3.5, and 3.6.
  • SQLAlchemy versions 0.9 and higher
  • serialization/de-serialization to/from JSON, CSV, YAML, and Python dict
  • serialization/de-serialization of columns/attributes, relationships, hybrid properties, and association proxies
  • enabling and disabling of serialization for particular formats and columns/relationships/attributes (e.g. you want to support an inbound password value, but never include an outbound one)
  • pre-serialization and post-deserialization value processing (for validation or type coercion)
  • a pretty straightforward syntax that is both Pythonic and seamlessly consistent with SQLAlchemy’s own approach

You can check out the (I hope!) comprehensive docs here: https://sqlathanor.readthedocs.io/en/latest

Hope this helps!


回答 11

自定义序列化和反序列化。

“ from_json”(类方法)基于json数据构建Model对象。

“反序列化”只能在实例上调用,并将json中的所有数据合并到Model实例中。

“序列化” -递归序列化

需要__write_only__属性来定义只写属性(例如“ password_hash”)。

class Serializable(object):
    __exclude__ = ('id',)
    __include__ = ()
    __write_only__ = ()

    @classmethod
    def from_json(cls, json, selfObj=None):
        if selfObj is None:
            self = cls()
        else:
            self = selfObj
        exclude = (cls.__exclude__ or ()) + Serializable.__exclude__
        include = cls.__include__ or ()
        if json:
            for prop, value in json.iteritems():
                # ignore all non user data, e.g. only
                if (not (prop in exclude) | (prop in include)) and isinstance(
                        getattr(cls, prop, None), QueryableAttribute):
                    setattr(self, prop, value)
        return self

    def deserialize(self, json):
        if not json:
            return None
        return self.__class__.from_json(json, selfObj=self)

    @classmethod
    def serialize_list(cls, object_list=[]):
        output = []
        for li in object_list:
            if isinstance(li, Serializable):
                output.append(li.serialize())
            else:
                output.append(li)
        return output

    def serialize(self, **kwargs):

        # init write only props
        if len(getattr(self.__class__, '__write_only__', ())) == 0:
            self.__class__.__write_only__ = ()
        dictionary = {}
        expand = kwargs.get('expand', ()) or ()
        prop = 'props'
        if expand:
            # expand all the fields
            for key in expand:
                getattr(self, key)
        iterable = self.__dict__.items()
        is_custom_property_set = False
        # include only properties passed as parameter
        if (prop in kwargs) and (kwargs.get(prop, None) is not None):
            is_custom_property_set = True
            iterable = kwargs.get(prop, None)
        # loop trough all accessible properties
        for key in iterable:
            accessor = key
            if isinstance(key, tuple):
                accessor = key[0]
            if not (accessor in self.__class__.__write_only__) and not accessor.startswith('_'):
                # force select from db to be able get relationships
                if is_custom_property_set:
                    getattr(self, accessor, None)
                if isinstance(self.__dict__.get(accessor), list):
                    dictionary[accessor] = self.__class__.serialize_list(object_list=self.__dict__.get(accessor))
                # check if those properties are read only
                elif isinstance(self.__dict__.get(accessor), Serializable):
                    dictionary[accessor] = self.__dict__.get(accessor).serialize()
                else:
                    dictionary[accessor] = self.__dict__.get(accessor)
        return dictionary

Custom serialization and deserialization.

“from_json” (class method) builds a Model object based on json data.

“deserialize” could be called only on instance, and merge all data from json into Model instance.

“serialize” – recursive serialization

__write_only__ property is needed to define write only properties (“password_hash” for example).

class Serializable(object):
    __exclude__ = ('id',)
    __include__ = ()
    __write_only__ = ()

    @classmethod
    def from_json(cls, json, selfObj=None):
        if selfObj is None:
            self = cls()
        else:
            self = selfObj
        exclude = (cls.__exclude__ or ()) + Serializable.__exclude__
        include = cls.__include__ or ()
        if json:
            for prop, value in json.iteritems():
                # ignore all non user data, e.g. only
                if (not (prop in exclude) | (prop in include)) and isinstance(
                        getattr(cls, prop, None), QueryableAttribute):
                    setattr(self, prop, value)
        return self

    def deserialize(self, json):
        if not json:
            return None
        return self.__class__.from_json(json, selfObj=self)

    @classmethod
    def serialize_list(cls, object_list=[]):
        output = []
        for li in object_list:
            if isinstance(li, Serializable):
                output.append(li.serialize())
            else:
                output.append(li)
        return output

    def serialize(self, **kwargs):

        # init write only props
        if len(getattr(self.__class__, '__write_only__', ())) == 0:
            self.__class__.__write_only__ = ()
        dictionary = {}
        expand = kwargs.get('expand', ()) or ()
        prop = 'props'
        if expand:
            # expand all the fields
            for key in expand:
                getattr(self, key)
        iterable = self.__dict__.items()
        is_custom_property_set = False
        # include only properties passed as parameter
        if (prop in kwargs) and (kwargs.get(prop, None) is not None):
            is_custom_property_set = True
            iterable = kwargs.get(prop, None)
        # loop trough all accessible properties
        for key in iterable:
            accessor = key
            if isinstance(key, tuple):
                accessor = key[0]
            if not (accessor in self.__class__.__write_only__) and not accessor.startswith('_'):
                # force select from db to be able get relationships
                if is_custom_property_set:
                    getattr(self, accessor, None)
                if isinstance(self.__dict__.get(accessor), list):
                    dictionary[accessor] = self.__class__.serialize_list(object_list=self.__dict__.get(accessor))
                # check if those properties are read only
                elif isinstance(self.__dict__.get(accessor), Serializable):
                    dictionary[accessor] = self.__dict__.get(accessor).serialize()
                else:
                    dictionary[accessor] = self.__dict__.get(accessor)
        return dictionary

回答 12

这是一个解决方案,可让您选择想要包含在输出中的关系。注意:这是一个完整的重写,将dict / str作为arg而不是列表。修复一些东西。

def deep_dict(self, relations={}):
    """Output a dict of an SA object recursing as deep as you want.

    Takes one argument, relations which is a dictionary of relations we'd
    like to pull out. The relations dict items can be a single relation
    name or deeper relation names connected by sub dicts

    Example:
        Say we have a Person object with a family relationship
            person.deep_dict(relations={'family':None})
        Say the family object has homes as a relation then we can do
            person.deep_dict(relations={'family':{'homes':None}})
            OR
            person.deep_dict(relations={'family':'homes'})
        Say homes has a relation like rooms you can do
            person.deep_dict(relations={'family':{'homes':'rooms'}})
            and so on...
    """
    mydict =  dict((c, str(a)) for c, a in
                    self.__dict__.items() if c != '_sa_instance_state')
    if not relations:
        # just return ourselves
        return mydict

    # otherwise we need to go deeper
    if not isinstance(relations, dict) and not isinstance(relations, str):
        raise Exception("relations should be a dict, it is of type {}".format(type(relations)))

    # got here so check and handle if we were passed a dict
    if isinstance(relations, dict):
        # we were passed deeper info
        for left, right in relations.items():
            myrel = getattr(self, left)
            if isinstance(myrel, list):
                mydict[left] = [rel.deep_dict(relations=right) for rel in myrel]
            else:
                mydict[left] = myrel.deep_dict(relations=right)
    # if we get here check and handle if we were passed a string
    elif isinstance(relations, str):
        # passed a single item
        myrel = getattr(self, relations)
        left = relations
        if isinstance(myrel, list):
            mydict[left] = [rel.deep_dict(relations=None)
                                 for rel in myrel]
        else:
            mydict[left] = myrel.deep_dict(relations=None)

    return mydict

因此,例如使用人员/家庭/房屋/房间的示例…将其转换为json,您所需要做的就是

json.dumps(person.deep_dict(relations={'family':{'homes':'rooms'}}))

Here is a solution that lets you select the relations you want to include in your output as deep as you would like to go. NOTE: This is a complete re-write taking a dict/str as an arg rather than a list. fixes some stuff..

def deep_dict(self, relations={}):
    """Output a dict of an SA object recursing as deep as you want.

    Takes one argument, relations which is a dictionary of relations we'd
    like to pull out. The relations dict items can be a single relation
    name or deeper relation names connected by sub dicts

    Example:
        Say we have a Person object with a family relationship
            person.deep_dict(relations={'family':None})
        Say the family object has homes as a relation then we can do
            person.deep_dict(relations={'family':{'homes':None}})
            OR
            person.deep_dict(relations={'family':'homes'})
        Say homes has a relation like rooms you can do
            person.deep_dict(relations={'family':{'homes':'rooms'}})
            and so on...
    """
    mydict =  dict((c, str(a)) for c, a in
                    self.__dict__.items() if c != '_sa_instance_state')
    if not relations:
        # just return ourselves
        return mydict

    # otherwise we need to go deeper
    if not isinstance(relations, dict) and not isinstance(relations, str):
        raise Exception("relations should be a dict, it is of type {}".format(type(relations)))

    # got here so check and handle if we were passed a dict
    if isinstance(relations, dict):
        # we were passed deeper info
        for left, right in relations.items():
            myrel = getattr(self, left)
            if isinstance(myrel, list):
                mydict[left] = [rel.deep_dict(relations=right) for rel in myrel]
            else:
                mydict[left] = myrel.deep_dict(relations=right)
    # if we get here check and handle if we were passed a string
    elif isinstance(relations, str):
        # passed a single item
        myrel = getattr(self, relations)
        left = relations
        if isinstance(myrel, list):
            mydict[left] = [rel.deep_dict(relations=None)
                                 for rel in myrel]
        else:
            mydict[left] = myrel.deep_dict(relations=None)

    return mydict

so for an example using person/family/homes/rooms… turning it into json all you need is

json.dumps(person.deep_dict(relations={'family':{'homes':'rooms'}}))

回答 13

def alc2json(row):
    return dict([(col, str(getattr(row,col))) for col in row.__table__.columns.keys()])

我以为我会和这个打一点代码高尔夫。

仅供参考:我正在使用automap_base因为我们有根据业务需求单独设计的架构。我今天才刚开始使用SQLAlchemy,但是文档指出automap_base是对clarativeative_base的扩展,这似乎是SQLAlchemy ORM中的典型范例,因此我认为这应该可行。

按照Tjorriemorrie的解决方案,使用后跟的外键并不太花哨,但是它只是将列与值匹配,并通过str()-列的值来处理Python类型。我们的值包括Python datetime.time和decimal.Decimal类类型的结果,因此可以完成工作。

希望这对任何路人都有帮助!

def alc2json(row):
    return dict([(col, str(getattr(row,col))) for col in row.__table__.columns.keys()])

I thought I’d play a little code golf with this one.

FYI: I am using automap_base since we have a separately designed schema according to business requirements. I just started using SQLAlchemy today but the documentation states that automap_base is an extension to declarative_base which seems to be the typical paradigm in the SQLAlchemy ORM so I believe this should work.

It does not get fancy with following foreign keys per Tjorriemorrie‘s solution, but it simply matches columns to values and handles Python types by str()-ing the column values. Our values consist Python datetime.time and decimal.Decimal class type results so it gets the job done.

Hope this helps any passers-by!


回答 14

我知道这是一个比较老的帖子。我接受了@SashaB提供的解决方案,并根据需要进行了修改。

我添加了以下内容:

  1. 字段忽略列表:序列化时要忽略的字段列表
  2. 字段替换列表:字典,其中包含要在序列化时用值替换的字段名称。
  3. 删除了方法并使BaseQuery序列化

我的代码如下:

def alchemy_json_encoder(revisit_self = False, fields_to_expand = [], fields_to_ignore = [], fields_to_replace = {}):
   """
   Serialize SQLAlchemy result into JSon
   :param revisit_self: True / False
   :param fields_to_expand: Fields which are to be expanded for including their children and all
   :param fields_to_ignore: Fields to be ignored while encoding
   :param fields_to_replace: Field keys to be replaced by values assigned in dictionary
   :return: Json serialized SQLAlchemy object
   """
   _visited_objs = []
   class AlchemyEncoder(json.JSONEncoder):
      def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            # don't re-visit self
            if revisit_self:
                if obj in _visited_objs:
                    return None
                _visited_objs.append(obj)

            # go through each field in this SQLalchemy class
            fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata' and x not in fields_to_ignore]:
                val = obj.__getattribute__(field)
                # is this field method defination, or an SQLalchemy object
                if not hasattr(val, "__call__") and not isinstance(val, BaseQuery):
                    field_name = fields_to_replace[field] if field in fields_to_replace else field
                    # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                    if isinstance(val.__class__, DeclarativeMeta) or \
                            (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                        # unless we're expanding this field, stop here
                        if field not in fields_to_expand:
                            # not expanding this field: set it to None and continue
                            fields[field_name] = None
                            continue

                    fields[field_name] = val
            # a json-encodable dict
            return fields

        return json.JSONEncoder.default(self, obj)
   return AlchemyEncoder

希望它能对某人有所帮助!

I know this is quite an older post. I took solution given by @SashaB and modified as per my need.

I added following things to it:

  1. Field ignore list: A list of fields to be ignored while serializing
  2. Field replace list: A dictionary containing field names to be replaced by values while serializing.
  3. Removed methods and BaseQuery getting serialized

My code is as follows:

def alchemy_json_encoder(revisit_self = False, fields_to_expand = [], fields_to_ignore = [], fields_to_replace = {}):
   """
   Serialize SQLAlchemy result into JSon
   :param revisit_self: True / False
   :param fields_to_expand: Fields which are to be expanded for including their children and all
   :param fields_to_ignore: Fields to be ignored while encoding
   :param fields_to_replace: Field keys to be replaced by values assigned in dictionary
   :return: Json serialized SQLAlchemy object
   """
   _visited_objs = []
   class AlchemyEncoder(json.JSONEncoder):
      def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            # don't re-visit self
            if revisit_self:
                if obj in _visited_objs:
                    return None
                _visited_objs.append(obj)

            # go through each field in this SQLalchemy class
            fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata' and x not in fields_to_ignore]:
                val = obj.__getattribute__(field)
                # is this field method defination, or an SQLalchemy object
                if not hasattr(val, "__call__") and not isinstance(val, BaseQuery):
                    field_name = fields_to_replace[field] if field in fields_to_replace else field
                    # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                    if isinstance(val.__class__, DeclarativeMeta) or \
                            (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                        # unless we're expanding this field, stop here
                        if field not in fields_to_expand:
                            # not expanding this field: set it to None and continue
                            fields[field_name] = None
                            continue

                    fields[field_name] = val
            # a json-encodable dict
            return fields

        return json.JSONEncoder.default(self, obj)
   return AlchemyEncoder

Hope it helps someone!


回答 15

使用SQLAlchemy中的内置序列化器

from sqlalchemy.ext.serializer import loads, dumps
obj = MyAlchemyObject()
# serialize object
serialized_obj = dumps(obj)

# deserialize object
obj = loads(serialized_obj)

如果要在会话之间转移对象,请记住使用将该对象与当前会话分离session.expunge(obj)。要再次附加它,只需执行即可session.add(obj)

Use the built-in serializer in SQLAlchemy:

from sqlalchemy.ext.serializer import loads, dumps
obj = MyAlchemyObject()
# serialize object
serialized_obj = dumps(obj)

# deserialize object
obj = loads(serialized_obj)

If you’re transferring the object between sessions, remember to detach the object from the current session using session.expunge(obj). To attach it again, just do session.add(obj).


回答 16

以下代码会将sqlalchemy结果序列化为json。

import json
from collections import OrderedDict


def asdict(self):
    result = OrderedDict()
    for key in self.__mapper__.c.keys():
        if getattr(self, key) is not None:
            result[key] = str(getattr(self, key))
        else:
            result[key] = getattr(self, key)
    return result


def to_array(all_vendors):
    v = [ ven.asdict() for ven in all_vendors ]
    return json.dumps(v) 

叫乐,

def all_products():
    all_products = Products.query.all()
    return to_array(all_products)

following code will serialize sqlalchemy result to json.

import json
from collections import OrderedDict


def asdict(self):
    result = OrderedDict()
    for key in self.__mapper__.c.keys():
        if getattr(self, key) is not None:
            result[key] = str(getattr(self, key))
        else:
            result[key] = getattr(self, key)
    return result


def to_array(all_vendors):
    v = [ ven.asdict() for ven in all_vendors ]
    return json.dumps(v) 

Calling fun,

def all_products():
    all_products = Products.query.all()
    return to_array(all_products)

回答 17

AlchemyEncoder很棒,但有时会失败,并使用十进制值。这是解决小数点问题的改进编码器-

class AlchemyEncoder(json.JSONEncoder):
# To serialize SQLalchemy objects 
def default(self, obj):
    if isinstance(obj.__class__, DeclarativeMeta):
        model_fields = {}
        for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
            data = obj.__getattribute__(field)
            print data
            try:
                json.dumps(data)  # this will fail on non-encodable values, like other classes
                model_fields[field] = data
            except TypeError:
                model_fields[field] = None
        return model_fields
    if isinstance(obj, Decimal):
        return float(obj)
    return json.JSONEncoder.default(self, obj)

The AlchemyEncoder is wonderful but sometimes fails with Decimal values. Here is an improved encoder that solves the decimal problem –

class AlchemyEncoder(json.JSONEncoder):
# To serialize SQLalchemy objects 
def default(self, obj):
    if isinstance(obj.__class__, DeclarativeMeta):
        model_fields = {}
        for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
            data = obj.__getattribute__(field)
            print data
            try:
                json.dumps(data)  # this will fail on non-encodable values, like other classes
                model_fields[field] = data
            except TypeError:
                model_fields[field] = None
        return model_fields
    if isinstance(obj, Decimal):
        return float(obj)
    return json.JSONEncoder.default(self, obj)

回答 18

当使用sqlalchemy连接到数据库时,这是一个高度可配置的简单解决方案。使用大熊猫。

import pandas as pd
import sqlalchemy

#sqlalchemy engine configuration
engine = sqlalchemy.create_engine....

def my_function():
  #read in from sql directly into a pandas dataframe
  #check the pandas documentation for additional config options
  sql_DF = pd.read_sql_table("table_name", con=engine)

  # "orient" is optional here but allows you to specify the json formatting you require
  sql_json = sql_DF.to_json(orient="index")

  return sql_json

When using sqlalchemy to connect to a db I this is a simple solution which is highly configurable. Use pandas.

import pandas as pd
import sqlalchemy

#sqlalchemy engine configuration
engine = sqlalchemy.create_engine....

def my_function():
  #read in from sql directly into a pandas dataframe
  #check the pandas documentation for additional config options
  sql_DF = pd.read_sql_table("table_name", con=engine)

  # "orient" is optional here but allows you to specify the json formatting you require
  sql_json = sql_DF.to_json(orient="index")

  return sql_json


回答 19

在Flask下,这可以工作并处理datatime字段,将类型的字段转换
'time': datetime.datetime(2018, 3, 22, 15, 40)
"time": "2018-03-22 15:40:00"

obj = {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

# This to get the JSON body
return json.dumps(obj)

# Or this to get a response object
return jsonify(obj)

Under Flask, this works and handles datatime fields, transforming a field of type
'time': datetime.datetime(2018, 3, 22, 15, 40) into
"time": "2018-03-22 15:40:00":

obj = {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

# This to get the JSON body
return json.dumps(obj)

# Or this to get a response object
return jsonify(obj)

回答 20

带有utf-8的内置串行器扼流圈无法解码某些输入的无效起始字节。相反,我去了:

def row_to_dict(row):
    temp = row.__dict__
    temp.pop('_sa_instance_state', None)
    return temp


def rows_to_list(rows):
    ret_rows = []
    for row in rows:
        ret_rows.append(row_to_dict(row))
    return ret_rows


@website_blueprint.route('/api/v1/some/endpoint', methods=['GET'])
def some_api():
    '''
    /some_endpoint
    '''
    rows = rows_to_list(SomeModel.query.all())
    response = app.response_class(
        response=jsonplus.dumps(rows),
        status=200,
        mimetype='application/json'
    )
    return response

The built in serializer chokes with utf-8 cannot decode invalid start byte for some inputs. Instead, I went with:

def row_to_dict(row):
    temp = row.__dict__
    temp.pop('_sa_instance_state', None)
    return temp


def rows_to_list(rows):
    ret_rows = []
    for row in rows:
        ret_rows.append(row_to_dict(row))
    return ret_rows


@website_blueprint.route('/api/v1/some/endpoint', methods=['GET'])
def some_api():
    '''
    /some_endpoint
    '''
    rows = rows_to_list(SomeModel.query.all())
    response = app.response_class(
        response=jsonplus.dumps(rows),
        status=200,
        mimetype='application/json'
    )
    return response

回答 21

也许您可以使用这样的类

from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy import Table


class Custom:
    """Some custom logic here!"""

    __table__: Table  # def for mypy

    @declared_attr
    def __tablename__(cls):  # pylint: disable=no-self-argument
        return cls.__name__  # pylint: disable= no-member

    def to_dict(self) -> Dict[str, Any]:
        """Serializes only column data."""
        return {c.name: getattr(self, c.name) for c in self.__table__.columns}

Base = declarative_base(cls=Custom)

class MyOwnTable(Base):
    #COLUMNS!

这样所有对象都有to_dict方法

Maybe you can use a class like this

from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy import Table


class Custom:
    """Some custom logic here!"""

    __table__: Table  # def for mypy

    @declared_attr
    def __tablename__(cls):  # pylint: disable=no-self-argument
        return cls.__name__  # pylint: disable= no-member

    def to_dict(self) -> Dict[str, Any]:
        """Serializes only column data."""
        return {c.name: getattr(self, c.name) for c in self.__table__.columns}

Base = declarative_base(cls=Custom)

class MyOwnTable(Base):
    #COLUMNS!

With that all objects have the to_dict method


回答 22

在使用一些原始sql和未定义的对象时,使用cursor.description似乎可以得到我想要的东西:

with connection.cursor() as cur:
    print(query)
    cur.execute(query)
    for item in cur.fetchall():
        row = {column.name: item[i] for i, column in enumerate(cur.description)}
        print(row)

While using some raw sql and undefined objects, using cursor.description appeared to get what I was looking for:

with connection.cursor() as cur:
    print(query)
    cur.execute(query)
    for item in cur.fetchall():
        row = {column.name: item[i] for i, column in enumerate(cur.description)}
        print(row)

回答 23

step1:
class CNAME:
   ...
   def as_dict(self):
       return {item.name: getattr(self, item.name) for item in self.__table__.columns}

step2:
list = []
for data in session.query(CNAME).all():
    list.append(data.as_dict())

step3:
return jsonify(list)
step1:
class CNAME:
   ...
   def as_dict(self):
       return {item.name: getattr(self, item.name) for item in self.__table__.columns}

step2:
list = []
for data in session.query(CNAME).all():
    list.append(data.as_dict())

step3:
return jsonify(list)

回答 24

我使用(太多?)词典的观点:

def serialize(_query):
    #d = dictionary written to per row
    #D = dictionary d is written to each time, then reset
    #Master = dictionary of dictionaries; the id Key (int, unique from database) 
    from D is used as the Key for the dictionary D entry in Master
    Master = {}
    D = {}
    x = 0
    for u in _query:
        d = u.__dict__
        D = {}
        for n in d.keys():
           if n != '_sa_instance_state':
                    D[n] = d[n]
        x = d['id']
        Master[x] = D
    return Master

与flask(包括jsonify)和flask_sqlalchemy一起运行,以将输出打印为JSON。

使用jsonify(serialize())调用该函数。

适用于到目前为止我尝试过的所有SQLAlchemy查询(运行SQLite3)

My take utilizing (too many?) dictionaries:

def serialize(_query):
    #d = dictionary written to per row
    #D = dictionary d is written to each time, then reset
    #Master = dictionary of dictionaries; the id Key (int, unique from database) 
    from D is used as the Key for the dictionary D entry in Master
    Master = {}
    D = {}
    x = 0
    for u in _query:
        d = u.__dict__
        D = {}
        for n in d.keys():
           if n != '_sa_instance_state':
                    D[n] = d[n]
        x = d['id']
        Master[x] = D
    return Master

Running with flask (including jsonify) and flask_sqlalchemy to print outputs as JSON.

Call the function with jsonify(serialize()).

Works with all SQLAlchemy queries I’ve tried so far (running SQLite3)