问题:如何在Flask-SQLAlchemy应用中执行原始SQL
如何在SQLAlchemy中执行原始SQL?
我有一个在烧瓶上运行的python Web应用程序,并通过SQLAlchemy连接到数据库。
我需要一种运行原始SQL的方法。该查询涉及多个表联接以及内联视图。
我试过了:
connection = db.session.connection()
connection.execute( <sql here> )
但是我不断收到网关错误。
回答 0
你有没有尝试过:
result = db.engine.execute("<sql here>")
要么:
from sqlalchemy import text
sql = text('select name from penguins')
result = db.engine.execute(sql)
names = [row[0] for row in result]
print names
回答 1
SQL Alchemy会话对象具有自己的execute
方法:
result = db.session.execute('SELECT * FROM my_table WHERE my_column = :val', {'val': 5})
您的所有应用程序查询都应通过会话对象,无论它们是否是原始SQL。这样可以确保由事务适当地管理查询,该事务允许将同一请求中的多个查询作为一个单元提交或回滚。使用引擎或连接进行事务处理将使您面临更大的隐患,即可能很难检测到可能导致数据损坏的错误的风险。每个请求应仅与一个事务相关联,并且使用db.session
可以确保您的应用程序是这种情况。
另请注意,它execute
是为参数化查询设计的。使用:val
示例中的参数作为查询的任何输入,以保护自己免受SQL注入攻击。您可以通过传递a dict
作为第二个参数来提供这些参数的值,其中每个键都是在查询中显示的参数名称。参数本身的确切语法可能会有所不同,具体取决于您的数据库,但是所有主要的关系数据库都以某种形式支持它们。
假设这是一个SELECT
查询,它将返回一个可迭代的RowProxy
对象。
您可以使用多种技术访问各个列:
for r in result:
print(r[0]) # Access by positional index
print(r['my_column']) # Access by column name as a string
r_dict = dict(r.items()) # convert to dict keyed by column names
就个人而言,我更喜欢将结果转换为namedtuple
s:
from collections import namedtuple
Record = namedtuple('Record', result.keys())
records = [Record(*r) for r in result.fetchall()]
for r in records:
print(r.my_column)
print(r)
如果您不使用Flask-SQLAlchemy扩展,您仍然可以轻松使用会话:
import sqlalchemy
from sqlalchemy.orm import sessionmaker, scoped_session
engine = sqlalchemy.create_engine('my connection string')
Session = scoped_session(sessionmaker(bind=engine))
s = Session()
result = s.execute('SELECT * FROM my_table WHERE my_column = :val', {'val': 5})
回答 2
docs:SQL表达式语言教程-使用文本
例:
from sqlalchemy.sql import text
connection = engine.connect()
# recommended
cmd = 'select * from Employees where EmployeeGroup = :group'
employeeGroup = 'Staff'
employees = connection.execute(text(cmd), group = employeeGroup)
# or - wee more difficult to interpret the command
employeeGroup = 'Staff'
employees = connection.execute(
text('select * from Employees where EmployeeGroup = :group'),
group = employeeGroup)
# or - notice the requirement to quote 'Staff'
employees = connection.execute(
text("select * from Employees where EmployeeGroup = 'Staff'"))
for employee in employees: logger.debug(employee)
# output
(0, 'Tim', 'Gurra', 'Staff', '991-509-9284')
(1, 'Jim', 'Carey', 'Staff', '832-252-1910')
(2, 'Lee', 'Asher', 'Staff', '897-747-1564')
(3, 'Ben', 'Hayes', 'Staff', '584-255-2631')
回答 3
你可以使用SELECT SQL查询的结果from_statement()
,并text()
如图所示这里。您不必以这种方式处理元组。作为User
具有表名的类的示例,users
您可以尝试,
from sqlalchemy.sql import text
.
.
.
user = session.query(User).from_statement(
text("SELECT * FROM users where name=:name")).\
params(name='ed').all()
return user
回答 4
result = db.engine.execute(text("<sql here>"))
<sql here>
除非您处于autocommit
模式,否则执行但不提交。因此,插入和更新不会反映在数据库中。
要在更改后提交,请执行
result = db.engine.execute(text("<sql here>").execution_options(autocommit=True))
回答 5
这是如何从Flask Shell运行SQL查询的简化答案
首先,映射您的模块(如果您的模块/应用程序是principal文件夹中的manage.py,并且您在UNIX操作系统中),请运行:
export FLASK_APP=manage
运行烧瓶壳
flask shell
导入我们需要的::
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app)
from sqlalchemy import text
运行查询:
result = db.engine.execute(text("<sql here>").execution_options(autocommit=True))
这将使用具有应用程序的当前数据库连接。
回答 6
您是否尝试过按照文档connection.execute(text( <sql here> ), <bind params here> )
所述使用和绑定参数?这可以帮助解决许多参数格式设置和性能问题。也许网关错误是超时?绑定参数往往会使复杂的查询执行得更快。
回答 7
如果你想避免的元组,另一种方式是通过调用first
,one
或all
方法:
query = db.engine.execute("SELECT * FROM blogs "
"WHERE id = 1 ")
assert query.first().name == "Welcome to my blog"