问题:如何在Flask-SQLAlchemy应用中执行原始SQL
如何在SQLAlchemy中执行原始SQL?
我有一个在烧瓶上运行的python Web应用程序,并通过SQLAlchemy连接到数据库。
我需要一种运行原始SQL的方法。该查询涉及多个表联接以及内联视图。
我试过了:
connection = db.session.connection()
connection.execute( <sql here> )但是我不断收到网关错误。
回答 0
你有没有尝试过:
result = db.engine.execute("<sql here>")要么:
from sqlalchemy import text
sql = text('select name from penguins')
result = db.engine.execute(sql)
names = [row[0] for row in result]
print names回答 1
SQL Alchemy会话对象具有自己的execute方法:
result = db.session.execute('SELECT * FROM my_table WHERE my_column = :val', {'val': 5})您的所有应用程序查询都应通过会话对象,无论它们是否是原始SQL。这样可以确保由事务适当地管理查询,该事务允许将同一请求中的多个查询作为一个单元提交或回滚。使用引擎或连接进行事务处理将使您面临更大的隐患,即可能很难检测到可能导致数据损坏的错误的风险。每个请求应仅与一个事务相关联,并且使用db.session可以确保您的应用程序是这种情况。
另请注意,它execute是为参数化查询设计的。使用:val示例中的参数作为查询的任何输入,以保护自己免受SQL注入攻击。您可以通过传递a dict作为第二个参数来提供这些参数的值,其中每个键都是在查询中显示的参数名称。参数本身的确切语法可能会有所不同,具体取决于您的数据库,但是所有主要的关系数据库都以某种形式支持它们。
假设这是一个SELECT查询,它将返回一个可迭代的RowProxy对象。
您可以使用多种技术访问各个列:
for r in result:
    print(r[0]) # Access by positional index
    print(r['my_column']) # Access by column name as a string
    r_dict = dict(r.items()) # convert to dict keyed by column names就个人而言,我更喜欢将结果转换为namedtuples:
from collections import namedtuple
Record = namedtuple('Record', result.keys())
records = [Record(*r) for r in result.fetchall()]
for r in records:
    print(r.my_column)
    print(r)如果您不使用Flask-SQLAlchemy扩展,您仍然可以轻松使用会话:
import sqlalchemy
from sqlalchemy.orm import sessionmaker, scoped_session
engine = sqlalchemy.create_engine('my connection string')
Session = scoped_session(sessionmaker(bind=engine))
s = Session()
result = s.execute('SELECT * FROM my_table WHERE my_column = :val', {'val': 5})回答 2
docs:SQL表达式语言教程-使用文本
例:
from sqlalchemy.sql import text
connection = engine.connect()
# recommended
cmd = 'select * from Employees where EmployeeGroup = :group'
employeeGroup = 'Staff'
employees = connection.execute(text(cmd), group = employeeGroup)
# or - wee more difficult to interpret the command
employeeGroup = 'Staff'
employees = connection.execute(
                  text('select * from Employees where EmployeeGroup = :group'), 
                  group = employeeGroup)
# or - notice the requirement to quote 'Staff'
employees = connection.execute(
                  text("select * from Employees where EmployeeGroup = 'Staff'"))
for employee in employees: logger.debug(employee)
# output
(0, 'Tim', 'Gurra', 'Staff', '991-509-9284')
(1, 'Jim', 'Carey', 'Staff', '832-252-1910')
(2, 'Lee', 'Asher', 'Staff', '897-747-1564')
(3, 'Ben', 'Hayes', 'Staff', '584-255-2631')回答 3
你可以使用SELECT SQL查询的结果from_statement(),并text()如图所示这里。您不必以这种方式处理元组。作为User具有表名的类的示例,users您可以尝试,
from sqlalchemy.sql import text
.
.
.
user = session.query(User).from_statement(
    text("SELECT * FROM users where name=:name")).\
    params(name='ed').all()
return user回答 4
result = db.engine.execute(text("<sql here>"))<sql here>除非您处于autocommit模式,否则执行但不提交。因此,插入和更新不会反映在数据库中。
要在更改后提交,请执行
result = db.engine.execute(text("<sql here>").execution_options(autocommit=True))回答 5
这是如何从Flask Shell运行SQL查询的简化答案
首先,映射您的模块(如果您的模块/应用程序是principal文件夹中的manage.py,并且您在UNIX操作系统中),请运行:
export FLASK_APP=manage运行烧瓶壳
flask shell导入我们需要的::
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app)
from sqlalchemy import text运行查询:
result = db.engine.execute(text("<sql here>").execution_options(autocommit=True))这将使用具有应用程序的当前数据库连接。
回答 6
您是否尝试过按照文档connection.execute(text( <sql here> ), <bind params here> )所述使用和绑定参数?这可以帮助解决许多参数格式设置和性能问题。也许网关错误是超时?绑定参数往往会使复杂的查询执行得更快。
回答 7
如果你想避免的元组,另一种方式是通过调用first,one或all方法:
query = db.engine.execute("SELECT * FROM blogs "
                           "WHERE id = 1 ")
assert query.first().name == "Welcome to my blog"
