标签归档:sqlalchemy

sqlalchemy中的分组和计数功能

问题:sqlalchemy中的分组和计数功能

我想在sqlalchemy中使用“分组并计数”命令。我怎样才能做到这一点?

I want a “group by and count” command in sqlalchemy. How can I do this?


回答 0

关于计数文档说,对于group_by查询,最好使用func.count()

from sqlalchemy import func
session.query(Table.column, func.count(Table.column)).group_by(Table.column).all()

The documentation on counting says that for group_by queries it is better to use func.count():

from sqlalchemy import func
session.query(Table.column, func.count(Table.column)).group_by(Table.column).all()

回答 1

如果您使用Table.query属性:

from sqlalchemy import func
Table.query.with_entities(Table.column, func.count(Table.column)).group_by(Table.column).all()

如果您使用session.query()方法(如miniwark的回答所述):

from sqlalchemy import func
session.query(Table.column, func.count(Table.column)).group_by(Table.column).all()

If you are using Table.query property:

from sqlalchemy import func
Table.query.with_entities(Table.column, func.count(Table.column)).group_by(Table.column).all()

If you are using session.query() method (as stated in miniwark’s answer):

from sqlalchemy import func
session.query(Table.column, func.count(Table.column)).group_by(Table.column).all()

回答 2

您还可以依靠多个组及其交集:

self.session.query(func.count(Table.column1),Table.column1, Table.column2).group_by(Table.column1, Table.column2).all()

上面的查询将返回两列中所有可能的值组合的计数。

You can also count on multiple groups and their intersection:

self.session.query(func.count(Table.column1),Table.column1, Table.column2).group_by(Table.column1, Table.column2).all()

The query above will return counts for all possible combinations of values from both columns.


SQLAlchemy:如何过滤日期字段?

问题:SQLAlchemy:如何过滤日期字段?

这是模型:

class User(Base):
    ...
    birthday = Column(Date, index=True)   #in database it's like '1987-01-17'
    ...

我想在两个日期之间进行过滤,例如选择间隔18-30年的所有用户。

如何用SQLAlchemy实现它?

我想:

query = DBSession.query(User).filter(
    and_(User.birthday >= '1988-01-17', User.birthday <= '1985-01-17')
) 

# means age >= 24 and age <= 27

我知道这是不正确的,但是该怎么做正确呢?

Here is model:

class User(Base):
    ...
    birthday = Column(Date, index=True)   #in database it's like '1987-01-17'
    ...

I want to filter between two dates, for example to choose all users in interval 18-30 years.

How to implement it with SQLAlchemy?

I think of:

query = DBSession.query(User).filter(
    and_(User.birthday >= '1988-01-17', User.birthday <= '1985-01-17')
) 

# means age >= 24 and age <= 27

I know this is not correct, but how to do correct?


回答 0

实际上,除了错别字,您的查询是正确的:您的过滤器排除了所有记录:您应该更改<=for >=,反之亦然:

qry = DBSession.query(User).filter(
        and_(User.birthday <= '1988-01-17', User.birthday >= '1985-01-17'))
# or same:
qry = DBSession.query(User).filter(User.birthday <= '1988-01-17').\
        filter(User.birthday >= '1985-01-17')

您也可以使用between

qry = DBSession.query(User).filter(User.birthday.between('1985-01-17', '1988-01-17'))

In fact, your query is right except for the typo: your filter is excluding all records: you should change the <= for >= and vice versa:

qry = DBSession.query(User).filter(
        and_(User.birthday <= '1988-01-17', User.birthday >= '1985-01-17'))
# or same:
qry = DBSession.query(User).filter(User.birthday <= '1988-01-17').\
        filter(User.birthday >= '1985-01-17')

Also you can use between:

qry = DBSession.query(User).filter(User.birthday.between('1985-01-17', '1988-01-17'))

回答 1

from app import SQLAlchemyDB as db

Chance.query.filter(Chance.repo_id==repo_id, 
                    Chance.status=="1", 
                    db.func.date(Chance.apply_time)<=end, 
                    db.func.date(Chance.apply_time)>=start).count()

它等于:

select
   count(id)
from
   Chance
where
   repo_id=:repo_id 
   and status='1'
   and date(apple_time) <= end
   and date(apple_time) >= start

希望可以帮助你。

from app import SQLAlchemyDB as db

Chance.query.filter(Chance.repo_id==repo_id, 
                    Chance.status=="1", 
                    db.func.date(Chance.apply_time)<=end, 
                    db.func.date(Chance.apply_time)>=start).count()

it is equal to:

select
   count(id)
from
   Chance
where
   repo_id=:repo_id 
   and status='1'
   and date(apple_time) <= end
   and date(apple_time) >= start

wish can help you.


回答 2

如果要获得整个期间:

    from sqlalchemy import and_, func

    query = DBSession.query(User).filter(and_(func.date(User.birthday) >= '1985-01-17'),\
                                              func.date(User.birthday) <= '1988-01-17'))

这表示范围:1985-01-17 00 : 00-1988-01-17 23:59

if you want to get the whole period:

    from sqlalchemy import and_, func

    query = DBSession.query(User).filter(and_(func.date(User.birthday) >= '1985-01-17'),\
                                              func.date(User.birthday) <= '1988-01-17'))

That means range: 1985-01-17 00:001988-01-17 23:59


sqlalchemy不为空选择

问题:sqlalchemy不为空选择

如何像在SQL中一样添加过滤器,以从特定列中选择非空值?

SELECT * 
FROM table 
WHERE YourColumn IS NOT NULL;

如何使用SQLAlchemy过滤器执行相同操作?

select = select(table).select_from(table).where(all_filters) 

How can I add the filter as in SQL to select values that are NOT NULL from a certain column ?

SELECT * 
FROM table 
WHERE YourColumn IS NOT NULL;

How can I do the same with SQLAlchemy filters?

select = select(table).select_from(table).where(all_filters) 

回答 0

column_obj != None会产生一个IS NOT NULL约束

在列上下文中,产生子句a != b。如果目标是None,则生成一个IS NOT NULL

或使用isnot()(0.7.9中的新功能):

实施IS NOT操作员。

通常,IS NOT当与的值进行比较时,会自动生成None,解析为NULL。但是,IS NOT如果与某些平台上的布尔值进行比较,则可能需要显式使用。

演示:

>>> from sqlalchemy.sql import column
>>> column('YourColumn') != None
<sqlalchemy.sql.elements.BinaryExpression object at 0x10c8d8b90>
>>> str(column('YourColumn') != None)
'"YourColumn" IS NOT NULL'
>>> column('YourColumn').isnot(None)
<sqlalchemy.sql.elements.BinaryExpression object at 0x104603850>
>>> str(column('YourColumn').isnot(None))
'"YourColumn" IS NOT NULL'

column_obj != None will produce a IS NOT NULL constraint:

In a column context, produces the clause a != b. If the target is None, produces a IS NOT NULL.

or use isnot() (new in 0.7.9):

Implement the IS NOT operator.

Normally, IS NOT is generated automatically when comparing to a value of None, which resolves to NULL. However, explicit usage of IS NOT may be desirable if comparing to boolean values on certain platforms.

Demo:

>>> from sqlalchemy.sql import column
>>> column('YourColumn') != None
<sqlalchemy.sql.elements.BinaryExpression object at 0x10c8d8b90>
>>> str(column('YourColumn') != None)
'"YourColumn" IS NOT NULL'
>>> column('YourColumn').isnot(None)
<sqlalchemy.sql.elements.BinaryExpression object at 0x104603850>
>>> str(column('YourColumn').isnot(None))
'"YourColumn" IS NOT NULL'

回答 1

从0.7.9版本开始,您可以使用filter运算符.isnot而不是比较约束,如下所示:

query.filter(User.name.isnot(None))

仅当担心pep8时才需要此方法。

来源:sqlalchemy文档

Starting in version 0.7.9 you can use the filter operator .isnot instead of comparing constraints, like this:

query.filter(User.name.isnot(None))

This method is only necessary if pep8 is a concern.

source: sqlalchemy documentation


回答 2

如果其他人想知道,可以使用is_生成foo IS NULL

>>>来自sqlalchemy.sql导入列
>>>打印列('foo')。is_(无)
foo IS NULL
>>>打印列('foo')。isnot(无)
foo不为空

In case anyone else is wondering, you can use is_ to generate foo IS NULL:

>>> from sqlalchemy.sql import column
>>> print column('foo').is_(None)
foo IS NULL
>>> print column('foo').isnot(None)
foo IS NOT NULL

目标数据库不是最新的

问题:目标数据库不是最新的

我想迁移一个Flask应用程序。我正在使用Alembic。

但是,我收到以下错误。

Target database is not up to date.

在网上,我读到它与此有关。 http://alembic.zzzcomputing.com/zh-CN/latest/cookbook.html#building-an-up-to-date-database-from-scratch

不幸的是,我不太了解如何使数据库保持最新状态,以及在何处/如何编写链接中给出的代码。如果您有迁移的经验,能否请您为我解释一下

谢谢

I’d like to make a migration for a Flask app. I am using Alembic.

However, I receive the following error.

Target database is not up to date.

Online, I read that it has something to do with this. http://alembic.zzzcomputing.com/en/latest/cookbook.html#building-an-up-to-date-database-from-scratch

Unfortunately, I don’t quite understand how to get the database up to date and where/how I should write the code given in the link. If you have experience with migrations, can you please explain this for me

Thanks


回答 0

创建迁移后,无论是手动还是as --autogenerate,都必须使用进行应用alembic upgrade head。如果db.create_all()从外壳程序使用alembic stamp head,则可以用来表示数据库的当前状态代表所有迁移的应用程序。

After creating a migration, either manually or as --autogenerate, you must apply it with alembic upgrade head. If you used db.create_all() from a shell, you can use alembic stamp head to indicate that the current state of the database represents the application of all migrations.


回答 1

这对我有用

$ flask db stamp head
$ flask db migrate
$ flask db upgrade

This Worked For me

$ flask db stamp head
$ flask db migrate
$ flask db upgrade

回答 2

我的想法是这样的,当我执行“ ./manage.py db migration -m’添加关系’”时,出现的错误是这样的:“ alembic.util.exc.CommandError:目标数据库不是最新的。”

因此,我检查了迁移状态:

(venv) ]#./manage.py db heads
d996b44eca57 (head)
(venv) ]#./manage.py db current
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
715f79abbd75

并发现磁头和电流不同!

我通过执行以下步骤对其进行了修复:

(venv)]#./manage.py db stamp heads
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running stamp_revision 715f79abbd75 -> d996b44eca57

而现在的头脑是一样的

(venv) ]#./manage.py db current
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
d996b44eca57 (head)

现在,我可以再次进行迁移了。

My stuation is like this question, When I execute “./manage.py db migrate -m ‘Add relationship'”, the error occused like this ” alembic.util.exc.CommandError: Target database is not up to date.”

So I checked the status of my migrate:

(venv) ]#./manage.py db heads
d996b44eca57 (head)
(venv) ]#./manage.py db current
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
715f79abbd75

and found that the heads and the current are different!

I fixed it by doing this steps:

(venv)]#./manage.py db stamp heads
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running stamp_revision 715f79abbd75 -> d996b44eca57

And now the current is same to the head

(venv) ]#./manage.py db current
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
d996b44eca57 (head)

And now I can do the migrate again.


回答 3

可以用多种方法解决bby:

1要解决此错误,请删除最新的迁移文件(python文件),然后尝试重新执行迁移。

如果问题仍然存在,请尝试以下命令:

$ flask db stamp head  # To set the revision in the database to the head, without performing any migrations. You can change head to the required change you want.
$ flask db migrate     # To detect automatically all the changes.
$ flask db upgrade     # To apply all the changes.

This can be solved bby many ways :

1 To fix this error, delete the latest migration file ( a python file) then try to perform a migration afresh.

If issue still persists try these commands :

$ flask db stamp head  # To set the revision in the database to the head, without performing any migrations. You can change head to the required change you want.
$ flask db migrate     # To detect automatically all the changes.
$ flask db upgrade     # To apply all the changes.

回答 4

由于某种原因,我不得不删除一些迁移文件。不知道为什么。但这解决了问题。

一个问题是数据库最终会正确更新,包括所有新表等,但是当我使用自动迁移时,迁移文件本身不会显示任何更改。

如果有人有更好的解决方案,请让我知道,因为目前我的解决方案有点怪异。

I had to delete some of my migration files for some reason. Not sure why. But that fixed the problem, kind of.

One issue is that the database ends up getting updated properly, with all the new tables, etc, but the migration files themselves don’t show any changes when I use automigrate.

If someone has a better solution, please let me know, as right now my solution is kind of hacky.


回答 5

$ flask db stamp head  # To set the revision in the database to the head, without performing any migrations. You can change head to the required change you want.
$ flask db migrate  # To detect automatically all the changes.
$ flask db upgrade  # To apply all the changes.

您可以在文档https://flask-migrate.readthedocs.io/en/latest/中找到更多信息。

$ flask db stamp head  # To set the revision in the database to the head, without performing any migrations. You can change head to the required change you want.
$ flask db migrate  # To detect automatically all the changes.
$ flask db upgrade  # To apply all the changes.

You can find more info at the documentation https://flask-migrate.readthedocs.io/en/latest/


回答 6

我也遇到了不同的想法,我想将其中一个字段从字符串更改为整数,因此首先运行:

$ flask db stamp head # to make the current the same
$ flask db migrate
$ flask db upgrade

现在解决了!

I did too run into different heads and I wanted to change one of the fields from string to integer, so first run:

$ flask db stamp head # to make the current the same
$ flask db migrate
$ flask db upgrade

It’s solved now!


回答 7

如果您和我一样刚刚开始一个新项目,并且正在使用内存中的SQLite数据库(sqlite:///:memory:),也会发生这种情况。如果您在这样的数据库上应用迁移,那么很明显,下次您要说自动生成修订时,数据库仍将保持其原始状态(空),因此,Alembic会抱怨目标数据库未达到要求日期。解决方案是切换到持久数据库。

This can also happen if you, like myself, have just started a new project and you are using in-memory SQLite database (sqlite:///:memory:). If you apply a migration on such a database, obviously the next time you want to say auto-generate a revision, the database will still be in its original state (empty), so alembic will be complaining that the target database is not up to date. The solution is to switch to a persisted database.


回答 8

要解决此错误,请删除最新的迁移文件(python文件),然后尝试重新执行迁移。

To fix this error, delete the latest migration file ( a python file) then try to perform a migration afresh.


回答 9

在执行db upgrade命令之前,请尝试删除所有表。

Try to drop all tables before execute the db upgrade command.


回答 10

为了解决这个问题,我删除(删除)了迁移中的表并运行以下命令

flask db migrate

flask db upgrade

To solve this, I drop(delete) the tables in migration and run these commands

flask db migrate

and

flask db upgrade

如何使用SQLAlchemy创建新数据库?

问题:如何使用SQLAlchemy创建新数据库?

使用SQLAlchemy,将创建一个Engine对象,如下所示:

from sqlalchemy import create_engine
engine = create_engine("postgresql://localhost/mydb")

engine如果在参数create_engine(在这种情况下为mydb)中指定的数据库不存在,则访问将失败。如果指定的数据库不存在,是否可以告诉SQLAlchemy创建一个新数据库?

Using SQLAlchemy, an Engine object is created like this:

from sqlalchemy import create_engine
engine = create_engine("postgresql://localhost/mydb")

Accessing engine fails if the database specified in the argument to create_engine (in this case, mydb) does not exist. Is it possible to tell SQLAlchemy to create a new database if the specified database doesn’t exist?


回答 0

在postgres上,默认情况下通常存在三个数据库。如果您能够以超级用户身份(例如,postgres角色)进行连接,则可以连接到postgrestemplate1数据库。默认的pg_hba.conf仅允许名为unix的用户postgres使用该postgres角色,因此最简单的方法就是成为该用户。无论如何,使用有权创建数据库的用户照常创建引擎:

>>> engine = sqlalchemy.create_engine("postgres://postgres@/postgres")

engine.execute()但是,您无法使用,因为postgres不允许您在事务内部创建数据库,而sqlalchemy始终尝试在事务中运行查询。要解决此问题,请从引擎获取基础连接:

>>> conn = engine.connect()

但是连接仍将在事务内部,因此您必须使用以下命令结束打开的事务commit

>>> conn.execute("commit")

然后,您可以使用适当的PostgreSQL命令继续创建数据库。

>>> conn.execute("create database test")
>>> conn.close()

On postgres, three databases are normally present by default. If you are able to connect as a superuser (eg, the postgres role), then you can connect to the postgres or template1 databases. The default pg_hba.conf permits only the unix user named postgres to use the postgres role, so the simplest thing is to just become that user. At any rate, create an engine as usual with a user that has the permissions to create a database:

>>> engine = sqlalchemy.create_engine("postgres://postgres@/postgres")

You cannot use engine.execute() however, because postgres does not allow you to create databases inside transactions, and sqlalchemy always tries to run queries in a transaction. To get around this, get the underlying connection from the engine:

>>> conn = engine.connect()

But the connection will still be inside a transaction, so you have to end the open transaction with a commit:

>>> conn.execute("commit")

And you can then proceed to create the database using the proper PostgreSQL command for it.

>>> conn.execute("create database test")
>>> conn.close()

回答 1

SQLAlchemy-UtilsSQLAlchemy提供自定义数据类型和各种实用程序功能。您可以使用pip安装最新的正式版本:

pip install sqlalchemy-utils

数据库的助手包括create_database功能:

from sqlalchemy import create_engine
from sqlalchemy_utils import database_exists, create_database

engine = create_engine("postgres://localhost/mydb")
if not database_exists(engine.url):
    create_database(engine.url)

print(database_exists(engine.url))

SQLAlchemy-Utils provides custom data types and various utility functions for SQLAlchemy. You can install the most recent official version using pip:

pip install sqlalchemy-utils

The database helpers include a create_database function:

from sqlalchemy import create_engine
from sqlalchemy_utils import database_exists, create_database

engine = create_engine("postgres://localhost/mydb")
if not database_exists(engine.url):
    create_database(engine.url)

print(database_exists(engine.url))

回答 2

通过提供isolation_level='AUTOCOMMIT'以下create_engine功能,可以避免在创建数据库时进行手动事务管理:

import sqlalchemy

with sqlalchemy.create_engine(
    'postgresql:///postgres',
    isolation_level='AUTOCOMMIT'
).connect() as connection:
    connection.execute('CREATE DATABASE my_database')

另外,如果不确定数据库是否存在,可以通过抑制sqlalchemy.exc.ProgrammingError异常来忽略由于存在而导致的数据库创建错误:

import contextlib
import sqlalchemy.exc

with contextlib.suppress(sqlalchemy.exc.ProgrammingError):
    # creating database as above

It’s possible to avoid manual transaction management while creating database by providing isolation_level='AUTOCOMMIT' to create_engine function:

import sqlalchemy

with sqlalchemy.create_engine(
    'postgresql:///postgres',
    isolation_level='AUTOCOMMIT'
).connect() as connection:
    connection.execute('CREATE DATABASE my_database')

Also if you are not sure that database doesn’t exist there is a way to ignore database creation error due to existence by suppressing sqlalchemy.exc.ProgrammingError exception:

import contextlib
import sqlalchemy.exc

with contextlib.suppress(sqlalchemy.exc.ProgrammingError):
    # creating database as above

回答 3

请注意,我无法获得上述建议,database_exists因为每当我检查数据库是否存在时,如果没有database_exists(engine.url):,就会出现此错误:

InterfaceError(’(pyodbc.InterfaceError)(\’28000 \’,u \'[28000] [Microsoft] [SQL Server Native Client 11.0] [SQL Server]用户\\’myUser \\’的登录失败。(18456) (SQLDriverConnect); [28000] [Microsoft] [SQL Server Native Client 11.0] [SQL Server]无法打开登录请求的数据库“ MY_DATABASE”。登录失败。(4060); [28000] [Microsoft] [SQL Server Native客户端11.0] [SQL Server]用户\\’myUser \\’的登录失败。(18456); [28000] [Microsoft] [SQL Server Native Client 11.0] [SQL Server]无法打开登录请求的数据库“ MY_DATABASE” 。登录失败。(4060)\’)’,)

同样contextlib/suppress也无法正常工作,并且我没有使用,postgres因此如果数据库恰好已存在于SQL Server中,那么我最终会这样做来忽略异常:

import logging
import sqlalchemy

logging.basicConfig(filename='app.log', format='%(asctime)s-%(levelname)s-%(message)s', level=logging.DEBUG)
engine = create_engine('mssql+pyodbc://myUser:mypwd@localhost:1234/MY_DATABASE?driver=SQL+Server+Native+Client+11.0?trusted_connection=yes', isolation_level = "AUTOCOMMIT")

try: 
    engine.execute('CREATE DATABASE ' + a_database_name)
except Exception as db_exc:
    logging.exception("Exception creating database: " + str(db_exc))  

Please note that I couldn’t get the above suggestions with database_exists because whenever I check if the database exists using if not database_exists(engine.url): I get this error:

InterfaceError(‘(pyodbc.InterfaceError) (\’28000\’, u\'[28000] [Microsoft][SQL Server Native Client 11.0][SQL Server]Login failed for user \\’myUser\\’. (18456) (SQLDriverConnect); [28000] [Microsoft][SQL Server Native Client 11.0][SQL Server]Cannot open database “MY_DATABASE” requested by the login. The login failed. (4060); [28000] [Microsoft][SQL Server Native Client 11.0][SQL Server]Login failed for user \\’myUser\\’. (18456); [28000] [Microsoft][SQL Server Native Client 11.0][SQL Server]Cannot open database “MY_DATABASE” requested by the login. The login failed. (4060)\’)’,)

Also contextlib/suppress was not working and I’m not using postgres so I ended up doing this to ignore the exception if the database happens to already exist with SQL Server:

import logging
import sqlalchemy

logging.basicConfig(filename='app.log', format='%(asctime)s-%(levelname)s-%(message)s', level=logging.DEBUG)
engine = create_engine('mssql+pyodbc://myUser:mypwd@localhost:1234/MY_DATABASE?driver=SQL+Server+Native+Client+11.0?trusted_connection=yes', isolation_level = "AUTOCOMMIT")

try: 
    engine.execute('CREATE DATABASE ' + a_database_name)
except Exception as db_exc:
    logging.exception("Exception creating database: " + str(db_exc))  

如何将DataFrame写入postgres表?

问题:如何将DataFrame写入postgres表?

DataFrame.to_sql方法,但仅适用于mysql,sqlite和oracle数据库。我无法传递给此方法postgres连接或sqlalchemy引擎。

There is DataFrame.to_sql method, but it works only for mysql, sqlite and oracle databases. I cant pass to this method postgres connection or sqlalchemy engine.


回答 0

从pandas 0.14(2014年5月发布)开始,支持postgresql。该sql模块现在用于sqlalchemy支持不同的数据库风格。您可以为PostgreSQL数据库传递sqlalchemy引擎(请参阅docs)。例如:

from sqlalchemy import create_engine
engine = create_engine('postgresql://scott:tiger@localhost:5432/mydatabase')
df.to_sql('table_name', engine)

您是正确的,在不支持0.13.1版本的熊猫中,不支持postgresql。如果您需要使用旧版本的熊猫,请使用以下修补版本pandas.io.sqlhttps : //gist.github.com/jorisvandenbossche/10841234
我是在前一段时间写的,所以不能完全保证它始终有效,但是基础应该在那里)。如果将该文件放在工作目录中并导入,那么您应该能够执行此操作(conPostgreSQL连接在哪里):

import sql  # the patched version (file is named sql.py)
sql.write_frame(df, 'table_name', con, flavor='postgresql')

Starting from pandas 0.14 (released end of May 2014), postgresql is supported. The sql module now uses sqlalchemy to support different database flavors. You can pass a sqlalchemy engine for a postgresql database (see docs). E.g.:

from sqlalchemy import create_engine
engine = create_engine('postgresql://scott:tiger@localhost:5432/mydatabase')
df.to_sql('table_name', engine)

You are correct that in pandas up to version 0.13.1 postgresql was not supported. If you need to use an older version of pandas, here is a patched version of pandas.io.sql: https://gist.github.com/jorisvandenbossche/10841234.
I wrote this a time ago, so cannot fully guarantee that it always works, buth the basis should be there). If you put that file in your working directory and import it, then you should be able to do (where con is a postgresql connection):

import sql  # the patched version (file is named sql.py)
sql.write_frame(df, 'table_name', con, flavor='postgresql')

回答 1

更快的选择:

以下代码比df.to_sql方法将您的Pandas DF复制到postgres DB的速度要快得多,并且您不需要任何中间的csv文件来存储df。

根据数据库规范创建引擎。

在您的postgres DB中创建一个表,该表的列数与Dataframe(df)相同。

DF中的数据将插入到您的postgres表中。

from sqlalchemy import create_engine
import psycopg2 
import io

如果要替换表,可以使用df中的标头将其替换为普通的to_sql方法,然后将整个耗时的df加载到DB中。

engine = create_engine('postgresql+psycopg2://username:password@host:port/database')

df.head(0).to_sql('table_name', engine, if_exists='replace',index=False) #truncates the table

conn = engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
df.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_from(output, 'table_name', null="") # null values become ''
conn.commit()

Faster option:

The following code will copy your Pandas DF to postgres DB much faster than df.to_sql method and you won’t need any intermediate csv file to store the df.

Create an engine based on your DB specifications.

Create a table in your postgres DB that has equal number of columns as the Dataframe (df).

Data in DF will get inserted in your postgres table.

from sqlalchemy import create_engine
import psycopg2 
import io

if you want to replace the table, we can replace it with normal to_sql method using headers from our df and then load the entire big time consuming df into DB.

engine = create_engine('postgresql+psycopg2://username:password@host:port/database')

df.head(0).to_sql('table_name', engine, if_exists='replace',index=False) #truncates the table

conn = engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
df.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_from(output, 'table_name', null="") # null values become ''
conn.commit()

回答 2

熊猫0.24.0+解决方案

在Pandas 0.24.0中引入了一个新功能,该功能是专为快速写入Postgres设计的。您可以在此处了解更多信息:https : //pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method

import csv
from io import StringIO

from sqlalchemy import create_engine

def psql_insert_copy(table, conn, keys, data_iter):
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ', '.join('"{}"'.format(k) for k in keys)
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
            table_name = table.name

        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
            table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)

engine = create_engine('postgresql://myusername:mypassword@myhost:5432/mydatabase')
df.to_sql('table_name', engine, method=psql_insert_copy)

Pandas 0.24.0+ solution

In Pandas 0.24.0 a new feature was introduced specifically designed for fast writes to Postgres. You can learn more about it here: https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method

import csv
from io import StringIO

from sqlalchemy import create_engine

def psql_insert_copy(table, conn, keys, data_iter):
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ', '.join('"{}"'.format(k) for k in keys)
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
            table_name = table.name

        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
            table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)

engine = create_engine('postgresql://myusername:mypassword@myhost:5432/mydatabase')
df.to_sql('table_name', engine, method=psql_insert_copy)

回答 3

这就是我做的。

可能更快,因为它正在使用execute_batch

# df is the dataframe
if len(df) > 0:
    df_columns = list(df)
    # create (col1,col2,...)
    columns = ",".join(df_columns)

    # create VALUES('%s', '%s",...) one '%s' per column
    values = "VALUES({})".format(",".join(["%s" for _ in df_columns])) 

    #create INSERT INTO table (columns) VALUES('%s',...)
    insert_stmt = "INSERT INTO {} ({}) {}".format(table,columns,values)

    cur = conn.cursor()
    psycopg2.extras.execute_batch(cur, insert_stmt, df.values)
    conn.commit()
    cur.close()

This is how I did it.

It may be faster because it is using execute_batch:

# df is the dataframe
if len(df) > 0:
    df_columns = list(df)
    # create (col1,col2,...)
    columns = ",".join(df_columns)

    # create VALUES('%s', '%s",...) one '%s' per column
    values = "VALUES({})".format(",".join(["%s" for _ in df_columns])) 

    #create INSERT INTO table (columns) VALUES('%s',...)
    insert_stmt = "INSERT INTO {} ({}) {}".format(table,columns,values)

    cur = conn.cursor()
    psycopg2.extras.execute_batch(cur, insert_stmt, df.values)
    conn.commit()
    cur.close()

回答 4

对于Python 2.7和Pandas 0.24.2并使用Psycopg2

Psycopg2连接模块

def dbConnect (db_parm, username_parm, host_parm, pw_parm):
    # Parse in connection information
    credentials = {'host': host_parm, 'database': db_parm, 'user': username_parm, 'password': pw_parm}
    conn = psycopg2.connect(**credentials)
    conn.autocommit = True  # auto-commit each entry to the database
    conn.cursor_factory = RealDictCursor
    cur = conn.cursor()
    print ("Connected Successfully to DB: " + str(db_parm) + "@" + str(host_parm))
    return conn, cur

连接到数据库

conn, cur = dbConnect(databaseName, dbUser, dbHost, dbPwd)

假设数据帧已经作为df存在

output = io.BytesIO() # For Python3 use StringIO
df.to_csv(output, sep='\t', header=True, index=False)
output.seek(0) # Required for rewinding the String object
copy_query = "COPY mem_info FROM STDOUT csv DELIMITER '\t' NULL ''  ESCAPE '\\' HEADER "  # Replace your table name in place of mem_info
cur.copy_expert(copy_query, output)
conn.commit()

For Python 2.7 and Pandas 0.24.2 and using Psycopg2

Psycopg2 Connection Module

def dbConnect (db_parm, username_parm, host_parm, pw_parm):
    # Parse in connection information
    credentials = {'host': host_parm, 'database': db_parm, 'user': username_parm, 'password': pw_parm}
    conn = psycopg2.connect(**credentials)
    conn.autocommit = True  # auto-commit each entry to the database
    conn.cursor_factory = RealDictCursor
    cur = conn.cursor()
    print ("Connected Successfully to DB: " + str(db_parm) + "@" + str(host_parm))
    return conn, cur

Connect to the database

conn, cur = dbConnect(databaseName, dbUser, dbHost, dbPwd)

Assuming dataframe to be present already as df

output = io.BytesIO() # For Python3 use StringIO
df.to_csv(output, sep='\t', header=True, index=False)
output.seek(0) # Required for rewinding the String object
copy_query = "COPY mem_info FROM STDOUT csv DELIMITER '\t' NULL ''  ESCAPE '\\' HEADER "  # Replace your table name in place of mem_info
cur.copy_expert(copy_query, output)
conn.commit()

如何从SQLAlchemy表达式获取原始的编译SQL查询?

问题:如何从SQLAlchemy表达式获取原始的编译SQL查询?

我有一个SQLAlchemy查询对象,想要获取已绑定所有参数的已编译SQL语句的文本(例如,否%s或其他变量正等待语句编译器或MySQLdb方言引擎的绑定等)。

调用str()查询将显示如下内容:

SELECT id WHERE date_added <= %s AND date_added >= %s ORDER BY count DESC

我试着在query._params中查找,但这是一个空字典。我使用装饰器的这个示例sqlalchemy.ext.compiler.compiles编写了自己的编译器,但即使那里的语句仍然有%s我想要的数据。

我无法弄清楚何时混入参数来创建查询。在检查查询对象时,它们始终是一个空字典(尽管查询执行得很好,并且当您打开echo记录时引擎会打印出来)。

我开始收到消息,SQLAlchemy不想让我知道底层查询,因为它破坏了表达式API接口的所有不同DB-API的一般性质。我不在乎查询是否在我发现查询之前就已经执行了;我只是想知道!

I have a SQLAlchemy query object and want to get the text of the compiled SQL statement, with all its parameters bound (e.g. no %s or other variables waiting to be bound by the statement compiler or MySQLdb dialect engine, etc).

Calling str() on the query reveals something like this:

SELECT id WHERE date_added <= %s AND date_added >= %s ORDER BY count DESC

I’ve tried looking in query._params but it’s an empty dict. I wrote my own compiler using this example of the sqlalchemy.ext.compiler.compiles decorator but even the statement there still has %s where I want data.

I can’t quite figure out when my parameters get mixed in to create the query; when examining the query object they’re always an empty dictionary (though the query executes fine and the engine prints it out when you turn echo logging on).

I’m starting to get the message that SQLAlchemy doesn’t want me to know the underlying query, as it breaks the general nature of the expression API’s interface all the different DB-APIs. I don’t mind if the query gets executed before I found out what it was; I just want to know!


回答 0

博客提供了更新的答案。

引用博客文章中的内容,这对我来说是建议和有效的。

>>> from sqlalchemy.dialects import postgresql
>>> print str(q.statement.compile(dialect=postgresql.dialect()))

其中q定义为:

>>> q = DBSession.query(model.Name).distinct(model.Name.value) \
             .order_by(model.Name.value)

或者只是任何一种session.query()。

感谢Nicolas Cadou的回答!希望对其他在这里搜索的人有所帮助。

This blog provides an updated answer.

Quoting from the blog post, this is suggested and worked for me.

>>> from sqlalchemy.dialects import postgresql
>>> print str(q.statement.compile(dialect=postgresql.dialect()))

Where q is defined as:

>>> q = DBSession.query(model.Name).distinct(model.Name.value) \
             .order_by(model.Name.value)

Or just any kind of session.query().

Thanks to Nicolas Cadou for the answer! I hope it helps others who come searching here.


回答 1

文档用于literal_binds打印q包含参数的查询:

print(q.statement.compile(compile_kwargs={"literal_binds": True}))

上面的方法有一个警告:仅基本类型(例如int和字符串)才支持该方法,此外,如果直接使用没有预设值的bindparam(),则也不能将其字符串化。

该文档还发出以下警告:

切勿将此技术与从不受信任的输入(例如从Web表单或其他用户输入应用程序)接收到的字符串内容一起使用。SQLAlchemy的将Python值强制转换为直接SQL字符串值的功能对于不受信任的输入是不安全的,并且无法验证传递的数据类型。以编程方式对关系数据库调用非DDL SQL语句时,请始终使用绑定参数。

The documentation uses literal_binds to print a query q including parameters:

print(q.statement.compile(compile_kwargs={"literal_binds": True}))

the above approach has the caveats that it is only supported for basic types, such as ints and strings, and furthermore if a bindparam() without a pre-set value is used directly, it won’t be able to stringify that either.

The documentation also issues this warning:

Never use this technique with string content received from untrusted input, such as from web forms or other user-input applications. SQLAlchemy’s facilities to coerce Python values into direct SQL string values are not secure against untrusted input and do not validate the type of data being passed. Always use bound parameters when programmatically invoking non-DDL SQL statements against a relational database.


回答 2

这应该适用于Sqlalchemy> = 0.6

from sqlalchemy.sql import compiler

from psycopg2.extensions import adapt as sqlescape
# or use the appropiate escape function from your db driver

def compile_query(query):
    dialect = query.session.bind.dialect
    statement = query.statement
    comp = compiler.SQLCompiler(dialect, statement)
    comp.compile()
    enc = dialect.encoding
    params = {}
    for k,v in comp.params.iteritems():
        if isinstance(v, unicode):
            v = v.encode(enc)
        params[k] = sqlescape(v)
    return (comp.string.encode(enc) % params).decode(enc)

This should work with Sqlalchemy >= 0.6

from sqlalchemy.sql import compiler

from psycopg2.extensions import adapt as sqlescape
# or use the appropiate escape function from your db driver

def compile_query(query):
    dialect = query.session.bind.dialect
    statement = query.statement
    comp = compiler.SQLCompiler(dialect, statement)
    comp.compile()
    enc = dialect.encoding
    params = {}
    for k,v in comp.params.iteritems():
        if isinstance(v, unicode):
            v = v.encode(enc)
        params[k] = sqlescape(v)
    return (comp.string.encode(enc) % params).decode(enc)

回答 3

对于MySQLdb后端,我稍微修改了albertov的出色答案(非常感谢!)。我敢肯定,可以将它们合并以检查是否存在comp.positionalTrue但这超出了此问题的范围。

def compile_query(query):
    from sqlalchemy.sql import compiler
    from MySQLdb.converters import conversions, escape

    dialect = query.session.bind.dialect
    statement = query.statement
    comp = compiler.SQLCompiler(dialect, statement)
    comp.compile()
    enc = dialect.encoding
    params = []
    for k in comp.positiontup:
        v = comp.params[k]
        if isinstance(v, unicode):
            v = v.encode(enc)
        params.append( escape(v, conversions) )
    return (comp.string.encode(enc) % tuple(params)).decode(enc)

For the MySQLdb backend I modified albertov’s awesome answer (thanks so much!) a bit. I’m sure they could be merged to check if comp.positional was True but that’s slightly beyond the scope of this question.

def compile_query(query):
    from sqlalchemy.sql import compiler
    from MySQLdb.converters import conversions, escape

    dialect = query.session.bind.dialect
    statement = query.statement
    comp = compiler.SQLCompiler(dialect, statement)
    comp.compile()
    enc = dialect.encoding
    params = []
    for k in comp.positiontup:
        v = comp.params[k]
        if isinstance(v, unicode):
            v = v.encode(enc)
        params.append( escape(v, conversions) )
    return (comp.string.encode(enc) % tuple(params)).decode(enc)

回答 4

事实是,sqlalchemy永远不会将数据与查询混合在一起。查询和数据分别传递到基础数据库驱动程序-数据插值发生在数据库中。

Sqlalchemy如您所见将查询传递str(myquery)给数据库,并且值将进入一个单独的元组。

您可以使用一些方法自己在查询中插入数据(如下面的albertov所建议),但这与sqlalchemy正在执行的事情不同。

Thing is, sqlalchemy never mixes the data with your query. The query and the data are passed separately to your underlying database driver – the interpolation of data happens in your database.

Sqlalchemy passes the query as you’ve seen in str(myquery) to the database, and the values will go in a separate tuple.

You could use some approach where you interpolate the data with the query yourself (as albertov suggested below), but that’s not the same thing that sqlalchemy is executing.


回答 5

首先,让我先说一下,我假设您这样做主要是出于调试目的-我不建议您尝试尝试在SQLAlchemy Fluent API之外修改语句。

不幸的是,似乎没有一种简单的方法可以显示包含查询参数的已编译语句。SQLAlchemy实际上并未将参数放入语句中-它们已作为字典传递给数据库引擎。这使特定于数据库的库可以处理诸如转义特殊字符的操作,以避免SQL注入。

但是您可以很容易地在两步过程中完成此操作。要获取该语句,您可以按照显示的操作进行操作,只需打印查询:

>>> print(query)
SELECT field_1, field_2 FROM table WHERE id=%s;

使用query.statement可以更进一步,以查看参数名称。请注意:id_1下面和%s上面的内容-在这个非常简单的示例中并不是真正的问题,但是在更复杂的语句中可能是关键。

>>> print(query.statement)
>>> print(query.statement.compile()) # seems to be equivalent, you can also
                                     # pass in a dialect if you want
SELECT field_1, field_2 FROM table WHERE id=:id_1;

然后,您可以通过获取params已编译语句的属性来获取参数的实际值:

>>> print(query.statement.compile().params)
{u'id_1': 1} 

至少对MySQL后端有用。我希望它对于PostgreSQL也足够通用,无需使用psycopg2

First let me preface by saying that I assume you’re doing this mainly for debugging purposes — I wouldn’t recommend trying to modify the statement outside of the SQLAlchemy fluent API.

Unfortunately there doesn’t seem to be a simple way to show the compiled statement with the query parameters included. SQLAlchemy doesn’t actually put the parameters into the statement — they’re passed into the database engine as a dictionary. This lets the database-specific library handle things like escaping special characters to avoid SQL injection.

But you can do this in a two-step process reasonably easily. To get the statement, you can do as you’ve already shown, and just print the query:

>>> print(query)
SELECT field_1, field_2 FROM table WHERE id=%s;

You can get one step closer with query.statement, to see the parameter names. Note :id_1 below vs %s above — not really a problem in this very simple example, but could be key in a more complicated statement.

>>> print(query.statement)
>>> print(query.statement.compile()) # seems to be equivalent, you can also
                                     # pass in a dialect if you want
SELECT field_1, field_2 FROM table WHERE id=:id_1;

Then, you can get the actual values of the parameters by getting the params property of the compiled statement:

>>> print(query.statement.compile().params)
{u'id_1': 1} 

This worked for a MySQL backend at least; I would expect it’s also general enough for PostgreSQL without needing to use psycopg2.


回答 6

对于使用psycopg2的Postgresql后端,您可以侦听该do_execute事件,然后使用游标,语句并键入强制参数以及Cursor.mogrify()内联参数。您可以返回True以防止实际执行查询。

import sqlalchemy

class QueryDebugger(object):
    def __init__(self, engine, query):
        with engine.connect() as connection:
            try:
                sqlalchemy.event.listen(engine, "do_execute", self.receive_do_execute)
                connection.execute(query)
            finally:
                sqlalchemy.event.remove(engine, "do_execute", self.receive_do_execute)

    def receive_do_execute(self, cursor, statement, parameters, context):
        self.statement = statement
        self.parameters = parameters
        self.query = cursor.mogrify(statement, parameters)
        # Don't actually execute
        return True

用法示例:

>>> engine = sqlalchemy.create_engine("postgresql://postgres@localhost/test")
>>> metadata = sqlalchemy.MetaData()
>>> users = sqlalchemy.Table('users', metadata, sqlalchemy.Column("_id", sqlalchemy.String, primary_key=True), sqlalchemy.Column("document", sqlalchemy.dialects.postgresql.JSONB))
>>> s = sqlalchemy.select([users.c.document.label("foobar")]).where(users.c.document.contains({"profile": {"iid": "something"}}))
>>> q = QueryDebugger(engine, s)
>>> q.query
'SELECT users.document AS foobar \nFROM users \nWHERE users.document @> \'{"profile": {"iid": "something"}}\''
>>> q.statement
'SELECT users.document AS foobar \nFROM users \nWHERE users.document @> %(document_1)s'
>>> q.parameters
{'document_1': '{"profile": {"iid": "something"}}'}

For postgresql backend using psycopg2, you can listen for the do_execute event, then use the cursor, statement and type coerced parameters along with Cursor.mogrify() to inline the parameters. You can return True to prevent actual execution of the query.

import sqlalchemy

class QueryDebugger(object):
    def __init__(self, engine, query):
        with engine.connect() as connection:
            try:
                sqlalchemy.event.listen(engine, "do_execute", self.receive_do_execute)
                connection.execute(query)
            finally:
                sqlalchemy.event.remove(engine, "do_execute", self.receive_do_execute)

    def receive_do_execute(self, cursor, statement, parameters, context):
        self.statement = statement
        self.parameters = parameters
        self.query = cursor.mogrify(statement, parameters)
        # Don't actually execute
        return True

Sample usage:

>>> engine = sqlalchemy.create_engine("postgresql://postgres@localhost/test")
>>> metadata = sqlalchemy.MetaData()
>>> users = sqlalchemy.Table('users', metadata, sqlalchemy.Column("_id", sqlalchemy.String, primary_key=True), sqlalchemy.Column("document", sqlalchemy.dialects.postgresql.JSONB))
>>> s = sqlalchemy.select([users.c.document.label("foobar")]).where(users.c.document.contains({"profile": {"iid": "something"}}))
>>> q = QueryDebugger(engine, s)
>>> q.query
'SELECT users.document AS foobar \nFROM users \nWHERE users.document @> \'{"profile": {"iid": "something"}}\''
>>> q.statement
'SELECT users.document AS foobar \nFROM users \nWHERE users.document @> %(document_1)s'
>>> q.parameters
{'document_1': '{"profile": {"iid": "something"}}'}

回答 7

以下解决方案使用SQLAlchemy表达式语言并与SQLAlchemy 1.1一起使用。该解决方案不将参数与查询混合(按原始作者的要求),但是提供了一种使用SQLAlchemy模型为不同SQL方言生成SQL查询字符串和参数字典的方法。该示例基于教程http://docs.sqlalchemy.org/en/rel_1_0/core/tutorial.html

上课了

from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class foo(Base):
    __tablename__ = 'foo'
    id = Column(Integer(), primary_key=True)
    name = Column(String(80), unique=True)
    value = Column(Integer())

我们可以使用select函数生成查询语句。

from sqlalchemy.sql import select    
statement = select([foo.name, foo.value]).where(foo.value > 0)

接下来,我们可以将语句编译成查询对象。

query = statement.compile()

默认情况下,该语句使用与SQLite和Oracle等SQL数据库兼容的基本“命名”实现进行编译。如果需要指定方言(例如PostgreSQL),则可以执行

from sqlalchemy.dialects import postgresql
query = statement.compile(dialect=postgresql.dialect())

或者,如果您想将方言明确指定为SQLite,则可以将参数样式从“ qmark”更改为“ named”。

from sqlalchemy.dialects import sqlite
query = statement.compile(dialect=sqlite.dialect(paramstyle="named"))

从查询对象中,我们可以提取查询字符串和查询参数

query_str = str(query)
query_params = query.params

最后执行查询。

conn.execute( query_str, query_params )

The following solution uses the SQLAlchemy Expression Language and works with SQLAlchemy 1.1. This solution does not mix the parameters with the query (as requested by the original author), but provides a way of using SQLAlchemy models to generate SQL query strings and parameter dictionaries for different SQL dialects. The example is based on the tutorial http://docs.sqlalchemy.org/en/rel_1_0/core/tutorial.html

Given the class,

from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class foo(Base):
    __tablename__ = 'foo'
    id = Column(Integer(), primary_key=True)
    name = Column(String(80), unique=True)
    value = Column(Integer())

we can produce a query statement using the select function.

from sqlalchemy.sql import select    
statement = select([foo.name, foo.value]).where(foo.value > 0)

Next, we can compile the statement into a query object.

query = statement.compile()

By default, the statement is compiled using a basic ‘named’ implementation that is compatible with SQL databases such as SQLite and Oracle. If you need to specify a dialect such as PostgreSQL, you can do

from sqlalchemy.dialects import postgresql
query = statement.compile(dialect=postgresql.dialect())

Or if you want to explicitly specify the dialect as SQLite, you can change the paramstyle from ‘qmark’ to ‘named’.

from sqlalchemy.dialects import sqlite
query = statement.compile(dialect=sqlite.dialect(paramstyle="named"))

From the query object, we can extract the query string and query parameters

query_str = str(query)
query_params = query.params

and finally execute the query.

conn.execute( query_str, query_params )

回答 8

您可以使用ConnectionEvents系列的事件:after_cursor_executebefore_cursor_execute

@zzzeek提供的sqlalchemy UsageRecipes中,您可以找到以下示例:

Profiling

...
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement,
                        parameters, context, executemany):
    conn.info.setdefault('query_start_time', []).append(time.time())
    logger.debug("Start Query: %s" % statement % parameters)
...

在这里您可以访问您的对帐单

You can use events from ConnectionEvents family: after_cursor_execute or before_cursor_execute.

In sqlalchemy UsageRecipes by @zzzeek you can find this example:

Profiling

...
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement,
                        parameters, context, executemany):
    conn.info.setdefault('query_start_time', []).append(time.time())
    logger.debug("Start Query: %s" % statement % parameters)
...

Here you can get access to your statement


回答 9

因此,将这些不同答案的很多点放在一起,我得出了我需要的东西:一组简单的代码可以插入,偶尔但可靠地(即处理所有数据类型)获取发送给我的准确的,已编译的SQL通过查询查询本身的Postgres后端:

from sqlalchemy.dialects import postgresql

query = [ .... some ORM query .... ]

compiled_query = query.statement.compile(
    dialect=postgresql.dialect()
)
mogrified_query = session.connection().connection.cursor().mogrify(
    str(compiled_query),
    compiled_query.params
)

print("compiled SQL = {s}".format(mogrified_query.decode())

So, putting together a lot of little bits of these different answers, I came up with what I needed: a simple set of code to drop in and occasionally but reliably (i.e. handles all data types) grab the exact, compiled SQL sent to my Postgres backend by just interrogating the query itself:

from sqlalchemy.dialects import postgresql

query = [ .... some ORM query .... ]

compiled_query = query.statement.compile(
    dialect=postgresql.dialect(),
    compile_kwargs={"literal_binds": True}
)
mogrified_query = session.connection().connection.cursor().mogrify(
    str(compiled_query),
    compiled_query.params
)

print("compiled SQL = {s}".format(mogrified_query.decode())

回答 10

我认为.statement可能会解决问题:http ://docs.sqlalchemy.org/en/latest/orm/query.html?highlight=query

>>> local_session.query(sqlalchemy_declarative.SomeTable.text).statement
<sqlalchemy.sql.annotation.AnnotatedSelect at 0x6c75a20; AnnotatedSelectobject>
>>> x=local_session.query(sqlalchemy_declarative.SomeTable.text).statement
>>> print(x)
SELECT sometable.text 
FROM sometable

I think .statement would possibly do the trick: http://docs.sqlalchemy.org/en/latest/orm/query.html?highlight=query

>>> local_session.query(sqlalchemy_declarative.SomeTable.text).statement
<sqlalchemy.sql.annotation.AnnotatedSelect at 0x6c75a20; AnnotatedSelectobject>
>>> x=local_session.query(sqlalchemy_declarative.SomeTable.text).statement
>>> print(x)
SELECT sometable.text 
FROM sometable

SQLAlchemy:创建与重用会话

问题:SQLAlchemy:创建与重用会话

只是一个简单的问题:SQLAlchemy的有关谈判调用sessionmaker()一次,但调用导致Session()每次你需要跟你的数据库的时间类。对我来说,这意味着第二个我会做我的第一个session.add(x)或类似的事情,我会先做

from project import Session
session = Session()

到目前为止,我所做的只是session = Session()在模型中进行一次调用,然后始终在应用程序中的任何位置导入相同的会话。由于这是一个Web应用程序,因此通常意味着相同(因为执行一个视图)。

但是区别在哪里?一直使用一个会话而不是在数据库中使用它直到我的函数完成,然后在下次我想与数据库对话时创建一个新会话的缺点是什么?

我得到的是,如果我使用多个线程,则每个线程都应该有自己的会话。但是,使用scoped_session(),我已经确定该问题不存在,对吗?

请澄清我的任何假设是否错误。

Just a quick question: SQLAlchemy talks about calling sessionmaker() once but calling the resulting Session() class each time you need to talk to your DB. For me that means the second I would do my first session.add(x) or something similar, I would first do

from project import Session
session = Session()

What I did until now was to make the call session = Session() in my model once and then always import the same session anywhere in my application. Since this is a web-applications this would usually mean the same (as one view is executed).

But where is the difference? What is the disadvantage of using one session all the time against using it for my database stuff until my function is done and then creating a new one the next time I want to talk to my DB?

I get that if I use multiple threads, each one should get their own session. But using scoped_session(), I already make sure that problem doesn’t exist, do I?

Please clarify if any of my assumptions are wrong.


回答 0

sessionmaker()是一个工厂,它鼓励Session在一个地方放置用于创建新对象的配置选项。它是可选的,因为您可以Session(bind=engine, expire_on_commit=False)随时随地调用一个new Session,除了它冗长而冗长,而且我想阻止小规模的“助手”的泛滥,每个小助手都在某些新版本中解决了这种冗余的问题。和更令人困惑的方式。

因此sessionmaker(),只有一种工具可以帮助您Session在需要时创建对象。

下一部分。我认为问题是,Session()在各个点上制作一个新的文件与一直使用一个新文件之间有什么区别。答案不是很多。 Session是您放入其中的所有对象的容器,然后它还会跟踪未完成的事务。现在,您调用rollback()commit(),事务结束,并且Session与数据库没有连接,直到再次调用它发出SQL。它提供给映射对象的链接是弱引用,前提是这些对象可以清除未决的更改,因此即使在这种情况Session下,当应用程序丢失对映射对象的所有引用时,will也会将自身清空为全新状态。如果保留其默认值"expire_on_commit"设置,则所有对象在提交后都将过期。如果该消息Session徘徊了五到二十分钟,并且下次您使用它时数据库中的所有事情都已更改,那么即使您将这些对象一直放在内存中,它也会在下次访问这些对象时加载所有全新状态。二十分钟。

在Web应用程序中,我们通常会说,嘿,为什么不Session针对每个请求创建一个全新的商标,而不是一遍又一遍地使用相同的商标。这种做法可确保新的请求开始“干净”。如果尚未对先前请求中的某些对象进行垃圾回收,并且如果您已关闭"expire_on_commit",则先前请求中的某些状态可能仍在徘徊,而该状态甚至可能已经很旧了。如果您小心翼翼地保持expire_on_commit打开状态,并且一定要调用commit()rollback()在请求结束时这样做,那很好,但是如果您使用的是全新的Session,那么甚至没有任何问题可以解决。因此,以一个新的请求开始每个请求的想法Session实际上,这是确保重新开始并使其使用expire_on_commit非常可选的最简单方法,因为对于commit()在一系列操作中间调用的操作,此标志可能会导致大量额外的SQL 。不知道这是否能回答您的问题。

下一轮是您提到的有关线程的内容。如果您的应用程序是多线程的,我们建议确保Session正在使用的内容是…某物的本地性。 scoped_session()默认情况下使它在当前线程本地。在Web应用程序中,本地请求实际上甚至更好。Flask-SQLAlchemy实际上将自定义“作用域函数”发送到,scoped_session()以便您获得请求范围的会话。一般的Pyramid应用程序会将会话粘贴到“请求”注册表中。当使用这样的方案时,“在请求开始时创建新会话”的想法仍然看起来像是使事情保持正直的最直接方法。

sessionmaker() is a factory, it’s there to encourage placing configuration options for creating new Session objects in just one place. It is optional, in that you could just as easily call Session(bind=engine, expire_on_commit=False) anytime you needed a new Session, except that its verbose and redundant, and I wanted to stop the proliferation of small-scale “helpers” that each approached the issue of this redundancy in some new and more confusing way.

So sessionmaker() is just a tool to help you create Session objects when you need them.

Next part. I think the question is, what’s the difference between making a new Session() at various points versus just using one all the way through. The answer, not very much. Session is a container for all the objects you put into it, and then it also keeps track of an open transaction. At the moment you call rollback() or commit(), the transaction is over, and the Session has no connection to the database until it is called upon to emit SQL again. The links it holds to your mapped objects are weak referencing, provided the objects are clean of pending changes, so even in that regard the Session will empty itself out back to a brand new state when your application loses all references to mapped objects. If you leave it with its default "expire_on_commit" setting, then all the objects are expired after a commit. If that Session hangs around for five or twenty minutes, and all kinds of things have changed in the database the next time you use it, it will load all brand new state the next time you access those objects even though they’ve been sitting in memory for twenty minutes.

In web applications, we usually say, hey why don’t you make a brand new Session on each request, rather than using the same one over and over again. This practice ensures that the new request begins “clean”. If some objects from the previous request haven’t been garbage collected yet, and if maybe you’ve turned off "expire_on_commit", maybe some state from the previous request is still hanging around, and that state might even be pretty old. If you’re careful to leave expire_on_commit turned on and to definitely call commit() or rollback() at request end, then it’s fine, but if you start with a brand new Session, then there’s not even any question that you’re starting clean. So the idea to start each request with a new Session is really just the simplest way to make sure you’re starting fresh, and to make the usage of expire_on_commit pretty much optional, as this flag can incur a lot of extra SQL for an operation that calls commit() in the middle of a series of operations. Not sure if this answers your question.

The next round is what you mention about threading. If your app is multithreaded, we recommend making sure the Session in use is local to…something. scoped_session() by default makes it local to the current thread. In a web app, local to the request is in fact even better. Flask-SQLAlchemy actually sends a custom “scope function” to scoped_session() so that you get a request-scoped session. The average Pyramid application sticks the Session into the “request” registry. When using schemes like these, the “create new Session on request start” idea continues to look like the most straightforward way to keep things straight.


回答 1

除了出色的zzzeek答案之外,以下是一个简单的方法,可以快速创建一次性的,自动封闭的会话:

from contextlib import contextmanager

from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

@contextmanager
def db_session(db_url):
    """ Creates a context with an open SQLAlchemy session.
    """
    engine = create_engine(db_url, convert_unicode=True)
    connection = engine.connect()
    db_session = scoped_session(sessionmaker(autocommit=False, autoflush=True, bind=engine))
    yield db_session
    db_session.close()
    connection.close()

用法:

from mymodels import Foo

with db_session("sqlite://") as db:
    foos = db.query(Foo).all()

In addition to the excellent zzzeek’s answer, here’s a simple recipe to quickly create throwaway, self-enclosed sessions:

from contextlib import contextmanager

from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

@contextmanager
def db_session(db_url):
    """ Creates a context with an open SQLAlchemy session.
    """
    engine = create_engine(db_url, convert_unicode=True)
    connection = engine.connect()
    db_session = scoped_session(sessionmaker(autocommit=False, autoflush=True, bind=engine))
    yield db_session
    db_session.close()
    connection.close()

Usage:

from mymodels import Foo

with db_session("sqlite://") as db:
    foos = db.query(Foo).all()

SQLAlchemy:打印实际查询

问题:SQLAlchemy:打印实际查询

我真的很希望能够为我的应用程序打印出有效的SQL,包括值,而不是绑定参数,但是如何在SQLAlchemy中做到这一点并不明显(我很确定)。

有人以一般方式解决了这个问题吗?

I’d really like to be able to print out valid SQL for my application, including values, rather than bind parameters, but it’s not obvious how to do this in SQLAlchemy (by design, I’m fairly sure).

Has anyone solved this problem in a general way?


回答 0

在大多数情况下,SQLAlchemy语句或查询的“字符串化”很简单:

print str(statement)

这适用于ORM Query以及任何其他select()或其他语句。

注意:以下详细答案正在sqlalchemy文档中维护

要获得已编译为特定方言或引擎的语句,如果该语句本身尚未绑定到某个方言,则可以将此语句传递给compile()

print statement.compile(someengine)

或没有引擎:

from sqlalchemy.dialects import postgresql
print statement.compile(dialect=postgresql.dialect())

当给定一个ORM Query对象时,为了获得该compile()方法,我们只需要首先访问.statement访问器即可:

statement = query.statement
print statement.compile(someengine)

关于将绑定参数“内联”到最终字符串的原始规定,这里的挑战是SQLAlchemy通常不承担此任务,因为这是由Python DBAPI适当处理的,更不用说绕过绑定参数了可能是现代Web应用程序中利用最广泛的安全漏洞。SQLAlchemy在某些情况下(例如,发出DDL的情况)进行这种字符串化的能力有限。为了访问此功能,可以使用传递给的’literal_binds’标志compile_kwargs

from sqlalchemy.sql import table, column, select

t = table('t', column('x'))

s = select([t]).where(t.c.x == 5)

print s.compile(compile_kwargs={"literal_binds": True})

上述方法有一个警告:仅基本类型(例如int和string)支持该方法,此外,如果 bindparam 直接使用不带预设值的a,则也不能将其字符串化。

要为不支持的类型支持内联文字呈现,请TypeDecorator为目标类型实现,其中包括一种 TypeDecorator.process_literal_param方法:

from sqlalchemy import TypeDecorator, Integer


class MyFancyType(TypeDecorator):
    impl = Integer

    def process_literal_param(self, value, dialect):
        return "my_fancy_formatting(%s)" % value

from sqlalchemy import Table, Column, MetaData

tab = Table('mytable', MetaData(), Column('x', MyFancyType()))

print(
    tab.select().where(tab.c.x > 5).compile(
        compile_kwargs={"literal_binds": True})
)

产生如下输出:

SELECT mytable.x
FROM mytable
WHERE mytable.x > my_fancy_formatting(5)

In the vast majority of cases, the “stringification” of a SQLAlchemy statement or query is as simple as:

print(str(statement))

This applies both to an ORM Query as well as any select() or other statement.

Note: the following detailed answer is being maintained on the sqlalchemy documentation.

To get the statement as compiled to a specific dialect or engine, if the statement itself is not already bound to one you can pass this in to compile():

print(statement.compile(someengine))

or without an engine:

from sqlalchemy.dialects import postgresql
print(statement.compile(dialect=postgresql.dialect()))

When given an ORM Query object, in order to get at the compile() method we only need access the .statement accessor first:

statement = query.statement
print(statement.compile(someengine))

with regards to the original stipulation that bound parameters are to be “inlined” into the final string, the challenge here is that SQLAlchemy normally is not tasked with this, as this is handled appropriately by the Python DBAPI, not to mention bypassing bound parameters is probably the most widely exploited security holes in modern web applications. SQLAlchemy has limited ability to do this stringification in certain circumstances such as that of emitting DDL. In order to access this functionality one can use the ‘literal_binds’ flag, passed to compile_kwargs:

from sqlalchemy.sql import table, column, select

t = table('t', column('x'))

s = select([t]).where(t.c.x == 5)

print(s.compile(compile_kwargs={"literal_binds": True}))

the above approach has the caveats that it is only supported for basic types, such as ints and strings, and furthermore if a bindparam without a pre-set value is used directly, it won’t be able to stringify that either.

To support inline literal rendering for types not supported, implement a TypeDecorator for the target type which includes a TypeDecorator.process_literal_param method:

from sqlalchemy import TypeDecorator, Integer


class MyFancyType(TypeDecorator):
    impl = Integer

    def process_literal_param(self, value, dialect):
        return "my_fancy_formatting(%s)" % value

from sqlalchemy import Table, Column, MetaData

tab = Table('mytable', MetaData(), Column('x', MyFancyType()))

print(
    tab.select().where(tab.c.x > 5).compile(
        compile_kwargs={"literal_binds": True})
)

producing output like:

SELECT mytable.x
FROM mytable
WHERE mytable.x > my_fancy_formatting(5)

回答 1

这可以在python 2和3中运行,并且比以前更干净,但是需要SA> = 1.0。

from sqlalchemy.engine.default import DefaultDialect
from sqlalchemy.sql.sqltypes import String, DateTime, NullType

# python2/3 compatible.
PY3 = str is not bytes
text = str if PY3 else unicode
int_type = int if PY3 else (int, long)
str_type = str if PY3 else (str, unicode)


class StringLiteral(String):
    """Teach SA how to literalize various things."""
    def literal_processor(self, dialect):
        super_processor = super(StringLiteral, self).literal_processor(dialect)

        def process(value):
            if isinstance(value, int_type):
                return text(value)
            if not isinstance(value, str_type):
                value = text(value)
            result = super_processor(value)
            if isinstance(result, bytes):
                result = result.decode(dialect.encoding)
            return result
        return process


class LiteralDialect(DefaultDialect):
    colspecs = {
        # prevent various encoding explosions
        String: StringLiteral,
        # teach SA about how to literalize a datetime
        DateTime: StringLiteral,
        # don't format py2 long integers to NULL
        NullType: StringLiteral,
    }


def literalquery(statement):
    """NOTE: This is entirely insecure. DO NOT execute the resulting strings."""
    import sqlalchemy.orm
    if isinstance(statement, sqlalchemy.orm.Query):
        statement = statement.statement
    return statement.compile(
        dialect=LiteralDialect(),
        compile_kwargs={'literal_binds': True},
    ).string

演示:

# coding: UTF-8
from datetime import datetime
from decimal import Decimal

from literalquery import literalquery


def test():
    from sqlalchemy.sql import table, column, select

    mytable = table('mytable', column('mycol'))
    values = (
        5,
        u'snowman: ☃',
        b'UTF-8 snowman: \xe2\x98\x83',
        datetime.now(),
        Decimal('3.14159'),
        10 ** 20,  # a long integer
    )

    statement = select([mytable]).where(mytable.c.mycol.in_(values)).limit(1)
    print(literalquery(statement))


if __name__ == '__main__':
    test()

给出以下输出:(在python 2.7和3.4中测试)

SELECT mytable.mycol
FROM mytable
WHERE mytable.mycol IN (5, 'snowman: ☃', 'UTF-8 snowman: ☃',
      '2015-06-24 18:09:29.042517', 3.14159, 100000000000000000000)
 LIMIT 1

This works in python 2 and 3 and is a bit cleaner than before, but requires SA>=1.0.

from sqlalchemy.engine.default import DefaultDialect
from sqlalchemy.sql.sqltypes import String, DateTime, NullType

# python2/3 compatible.
PY3 = str is not bytes
text = str if PY3 else unicode
int_type = int if PY3 else (int, long)
str_type = str if PY3 else (str, unicode)


class StringLiteral(String):
    """Teach SA how to literalize various things."""
    def literal_processor(self, dialect):
        super_processor = super(StringLiteral, self).literal_processor(dialect)

        def process(value):
            if isinstance(value, int_type):
                return text(value)
            if not isinstance(value, str_type):
                value = text(value)
            result = super_processor(value)
            if isinstance(result, bytes):
                result = result.decode(dialect.encoding)
            return result
        return process


class LiteralDialect(DefaultDialect):
    colspecs = {
        # prevent various encoding explosions
        String: StringLiteral,
        # teach SA about how to literalize a datetime
        DateTime: StringLiteral,
        # don't format py2 long integers to NULL
        NullType: StringLiteral,
    }


def literalquery(statement):
    """NOTE: This is entirely insecure. DO NOT execute the resulting strings."""
    import sqlalchemy.orm
    if isinstance(statement, sqlalchemy.orm.Query):
        statement = statement.statement
    return statement.compile(
        dialect=LiteralDialect(),
        compile_kwargs={'literal_binds': True},
    ).string

Demo:

# coding: UTF-8
from datetime import datetime
from decimal import Decimal

from literalquery import literalquery


def test():
    from sqlalchemy.sql import table, column, select

    mytable = table('mytable', column('mycol'))
    values = (
        5,
        u'snowman: ☃',
        b'UTF-8 snowman: \xe2\x98\x83',
        datetime.now(),
        Decimal('3.14159'),
        10 ** 20,  # a long integer
    )

    statement = select([mytable]).where(mytable.c.mycol.in_(values)).limit(1)
    print(literalquery(statement))


if __name__ == '__main__':
    test()

Gives this output: (tested in python 2.7 and 3.4)

SELECT mytable.mycol
FROM mytable
WHERE mytable.mycol IN (5, 'snowman: ☃', 'UTF-8 snowman: ☃',
      '2015-06-24 18:09:29.042517', 3.14159, 100000000000000000000)
 LIMIT 1

回答 2

鉴于您想要的仅在调试时才有意义,因此可以使用来启动SQLAlchemy echo=True,以记录所有SQL查询。例如:

engine = create_engine(
    "mysql://scott:tiger@hostname/dbname",
    encoding="latin1",
    echo=True,
)

也可以仅针对单个请求进行修改:

echo=False–如果True,引擎将所有语句repr()及其参数列表中的一个记录到引擎记录器中,默认为sys.stdout。可以随时修改的echo属性Engine以打开和关闭登录。如果设置为string "debug",结果行也将被打印到标准输出。该标志最终控制着Python记录器;请参阅配置日志有关如何直接的信息,。

来源:SQLAlchemy引擎配置

如果与Flask一起使用,则只需设置

app.config["SQLALCHEMY_ECHO"] = True

获得相同的行为。

Given that what you want makes sense only when debugging, you could start SQLAlchemy with echo=True, to log all SQL queries. For example:

engine = create_engine(
    "mysql://scott:tiger@hostname/dbname",
    encoding="latin1",
    echo=True,
)

This can also be modified for just a single request:

echo=False – if True, the Engine will log all statements as well as a repr() of their parameter lists to the engines logger, which defaults to sys.stdout. The echo attribute of Engine can be modified at any time to turn logging on and off. If set to the string "debug", result rows will be printed to the standard output as well. This flag ultimately controls a Python logger; see Configuring Logging for information on how to configure logging directly.

Source: SQLAlchemy Engine Configuration

If used with Flask, you can simply set

app.config["SQLALCHEMY_ECHO"] = True

to get the same behaviour.


回答 3

为此,我们可以使用编译方法。从文档

from sqlalchemy.sql import text
from sqlalchemy.dialects import postgresql

stmt = text("SELECT * FROM users WHERE users.name BETWEEN :x AND :y")
stmt = stmt.bindparams(x="m", y="z")

print(stmt.compile(dialect=postgresql.dialect(),compile_kwargs={"literal_binds": True}))

结果:

SELECT * FROM users WHERE users.name BETWEEN 'm' AND 'z'

来自文档的警告:

切勿将此技术与从不受信任的输入(例如从Web表单或其他用户输入应用程序)接收的字符串内容一起使用。SQLAlchemy的将Python值强制转换为直接SQL字符串值的功能对于不受信任的输入是不安全的,并且无法验证传递的数据类型。以编程方式对关系数据库调用非DDL SQL语句时,请始终使用绑定参数。

We can use compile method for this purpose. From the docs:

from sqlalchemy.sql import text
from sqlalchemy.dialects import postgresql

stmt = text("SELECT * FROM users WHERE users.name BETWEEN :x AND :y")
stmt = stmt.bindparams(x="m", y="z")

print(stmt.compile(dialect=postgresql.dialect(),compile_kwargs={"literal_binds": True}))

Result:

SELECT * FROM users WHERE users.name BETWEEN 'm' AND 'z'

Warning from docs:

Never use this technique with string content received from untrusted input, such as from web forms or other user-input applications. SQLAlchemy’s facilities to coerce Python values into direct SQL string values are not secure against untrusted input and do not validate the type of data being passed. Always use bound parameters when programmatically invoking non-DDL SQL statements against a relational database.


回答 4

因此,在@zzzeek对@bukzor的代码的注释的基础上,我想到了这一点,以轻松获得“可打印的”查询:

def prettyprintable(statement, dialect=None, reindent=True):
    """Generate an SQL expression string with bound parameters rendered inline
    for the given SQLAlchemy statement. The function can also receive a
    `sqlalchemy.orm.Query` object instead of statement.
    can 

    WARNING: Should only be used for debugging. Inlining parameters is not
             safe when handling user created data.
    """
    import sqlparse
    import sqlalchemy.orm
    if isinstance(statement, sqlalchemy.orm.Query):
        if dialect is None:
            dialect = statement.session.get_bind().dialect
        statement = statement.statement
    compiled = statement.compile(dialect=dialect,
                                 compile_kwargs={'literal_binds': True})
    return sqlparse.format(str(compiled), reindent=reindent)

我个人很难阅读未缩进的代码,因此我习惯于sqlparse重新缩进SQL。可以使用进行安装pip install sqlparse

So building on @zzzeek’s comments on @bukzor’s code I came up with this to easily get a “pretty-printable” query:

def prettyprintable(statement, dialect=None, reindent=True):
    """Generate an SQL expression string with bound parameters rendered inline
    for the given SQLAlchemy statement. The function can also receive a
    `sqlalchemy.orm.Query` object instead of statement.
    can 

    WARNING: Should only be used for debugging. Inlining parameters is not
             safe when handling user created data.
    """
    import sqlparse
    import sqlalchemy.orm
    if isinstance(statement, sqlalchemy.orm.Query):
        if dialect is None:
            dialect = statement.session.get_bind().dialect
        statement = statement.statement
    compiled = statement.compile(dialect=dialect,
                                 compile_kwargs={'literal_binds': True})
    return sqlparse.format(str(compiled), reindent=reindent)

I personally have a hard time reading code which is not indented so I’ve used sqlparse to reindent the SQL. It can be installed with pip install sqlparse.


回答 5

该代码基于@bukzor 提供的出色答案。我刚刚将自定义渲染datetime.datetime类型添加到Oracle的TO_DATE()

随时更新代码以适合您的数据库:

import decimal
import datetime

def printquery(statement, bind=None):
    """
    print a query, with values filled in
    for debugging purposes *only*
    for security, you should always separate queries from their values
    please also note that this function is quite slow
    """
    import sqlalchemy.orm
    if isinstance(statement, sqlalchemy.orm.Query):
        if bind is None:
            bind = statement.session.get_bind(
                    statement._mapper_zero_or_none()
            )
        statement = statement.statement
    elif bind is None:
        bind = statement.bind 

    dialect = bind.dialect
    compiler = statement._compiler(dialect)
    class LiteralCompiler(compiler.__class__):
        def visit_bindparam(
                self, bindparam, within_columns_clause=False, 
                literal_binds=False, **kwargs
        ):
            return super(LiteralCompiler, self).render_literal_bindparam(
                    bindparam, within_columns_clause=within_columns_clause,
                    literal_binds=literal_binds, **kwargs
            )
        def render_literal_value(self, value, type_):
            """Render the value of a bind parameter as a quoted literal.

            This is used for statement sections that do not accept bind paramters
            on the target driver/database.

            This should be implemented by subclasses using the quoting services
            of the DBAPI.

            """
            if isinstance(value, basestring):
                value = value.replace("'", "''")
                return "'%s'" % value
            elif value is None:
                return "NULL"
            elif isinstance(value, (float, int, long)):
                return repr(value)
            elif isinstance(value, decimal.Decimal):
                return str(value)
            elif isinstance(value, datetime.datetime):
                return "TO_DATE('%s','YYYY-MM-DD HH24:MI:SS')" % value.strftime("%Y-%m-%d %H:%M:%S")

            else:
                raise NotImplementedError(
                            "Don't know how to literal-quote value %r" % value)            

    compiler = LiteralCompiler(dialect, statement)
    print compiler.process(statement)

This code is based on brilliant existing answer from @bukzor. I just added custom render for datetime.datetime type into Oracle’s TO_DATE().

Feel free to update code to suit your database:

import decimal
import datetime

def printquery(statement, bind=None):
    """
    print a query, with values filled in
    for debugging purposes *only*
    for security, you should always separate queries from their values
    please also note that this function is quite slow
    """
    import sqlalchemy.orm
    if isinstance(statement, sqlalchemy.orm.Query):
        if bind is None:
            bind = statement.session.get_bind(
                    statement._mapper_zero_or_none()
            )
        statement = statement.statement
    elif bind is None:
        bind = statement.bind 

    dialect = bind.dialect
    compiler = statement._compiler(dialect)
    class LiteralCompiler(compiler.__class__):
        def visit_bindparam(
                self, bindparam, within_columns_clause=False, 
                literal_binds=False, **kwargs
        ):
            return super(LiteralCompiler, self).render_literal_bindparam(
                    bindparam, within_columns_clause=within_columns_clause,
                    literal_binds=literal_binds, **kwargs
            )
        def render_literal_value(self, value, type_):
            """Render the value of a bind parameter as a quoted literal.

            This is used for statement sections that do not accept bind paramters
            on the target driver/database.

            This should be implemented by subclasses using the quoting services
            of the DBAPI.

            """
            if isinstance(value, basestring):
                value = value.replace("'", "''")
                return "'%s'" % value
            elif value is None:
                return "NULL"
            elif isinstance(value, (float, int, long)):
                return repr(value)
            elif isinstance(value, decimal.Decimal):
                return str(value)
            elif isinstance(value, datetime.datetime):
                return "TO_DATE('%s','YYYY-MM-DD HH24:MI:SS')" % value.strftime("%Y-%m-%d %H:%M:%S")

            else:
                raise NotImplementedError(
                            "Don't know how to literal-quote value %r" % value)            

    compiler = LiteralCompiler(dialect, statement)
    print compiler.process(statement)

回答 6

我想指出的是,以上给出的解决方案不适用于非平凡的查询。我遇到的一个问题是更复杂的类型,例如导致问题的pgsql ARRAY。我确实找到了一个对我来说甚至可以与pgsql ARRAY一起使用的解决方案:

借用:https : //gist.github.com/gsakkis/4572159

链接的代码似乎基于旧版本的SQLAlchemy。您会收到一条错误消息,指出_mapper_zero_or_none属性不存在。这是一个更新的版本,将与较新的版本一起使用,您只需将_mapper_zero_or_none替换为bind即可。此外,它还支持pgsql数组:

# adapted from:
# https://gist.github.com/gsakkis/4572159
from datetime import date, timedelta
from datetime import datetime

from sqlalchemy.orm import Query


try:
    basestring
except NameError:
    basestring = str


def render_query(statement, dialect=None):
    """
    Generate an SQL expression string with bound parameters rendered inline
    for the given SQLAlchemy statement.
    WARNING: This method of escaping is insecure, incomplete, and for debugging
    purposes only. Executing SQL statements with inline-rendered user values is
    extremely insecure.
    Based on http://stackoverflow.com/questions/5631078/sqlalchemy-print-the-actual-query
    """
    if isinstance(statement, Query):
        if dialect is None:
            dialect = statement.session.bind.dialect
        statement = statement.statement
    elif dialect is None:
        dialect = statement.bind.dialect

    class LiteralCompiler(dialect.statement_compiler):

        def visit_bindparam(self, bindparam, within_columns_clause=False,
                            literal_binds=False, **kwargs):
            return self.render_literal_value(bindparam.value, bindparam.type)

        def render_array_value(self, val, item_type):
            if isinstance(val, list):
                return "{%s}" % ",".join([self.render_array_value(x, item_type) for x in val])
            return self.render_literal_value(val, item_type)

        def render_literal_value(self, value, type_):
            if isinstance(value, long):
                return str(value)
            elif isinstance(value, (basestring, date, datetime, timedelta)):
                return "'%s'" % str(value).replace("'", "''")
            elif isinstance(value, list):
                return "'{%s}'" % (",".join([self.render_array_value(x, type_.item_type) for x in value]))
            return super(LiteralCompiler, self).render_literal_value(value, type_)

    return LiteralCompiler(dialect, statement).process(statement)

已测试到两个级别的嵌套数组。

I would like to point out that the solutions given above do not “just work” with non-trivial queries. One issue I came across were more complicated types, such as pgsql ARRAYs causing issues. I did find a solution that for me, did just work even with pgsql ARRAYs:

borrowed from: https://gist.github.com/gsakkis/4572159

The linked code seems to be based on an older version of SQLAlchemy. You’ll get an error saying that the attribute _mapper_zero_or_none doesn’t exist. Here’s an updated version that will work with a newer version, you simply replace _mapper_zero_or_none with bind. Additionally, this has support for pgsql arrays:

# adapted from:
# https://gist.github.com/gsakkis/4572159
from datetime import date, timedelta
from datetime import datetime

from sqlalchemy.orm import Query


try:
    basestring
except NameError:
    basestring = str


def render_query(statement, dialect=None):
    """
    Generate an SQL expression string with bound parameters rendered inline
    for the given SQLAlchemy statement.
    WARNING: This method of escaping is insecure, incomplete, and for debugging
    purposes only. Executing SQL statements with inline-rendered user values is
    extremely insecure.
    Based on http://stackoverflow.com/questions/5631078/sqlalchemy-print-the-actual-query
    """
    if isinstance(statement, Query):
        if dialect is None:
            dialect = statement.session.bind.dialect
        statement = statement.statement
    elif dialect is None:
        dialect = statement.bind.dialect

    class LiteralCompiler(dialect.statement_compiler):

        def visit_bindparam(self, bindparam, within_columns_clause=False,
                            literal_binds=False, **kwargs):
            return self.render_literal_value(bindparam.value, bindparam.type)

        def render_array_value(self, val, item_type):
            if isinstance(val, list):
                return "{%s}" % ",".join([self.render_array_value(x, item_type) for x in val])
            return self.render_literal_value(val, item_type)

        def render_literal_value(self, value, type_):
            if isinstance(value, long):
                return str(value)
            elif isinstance(value, (basestring, date, datetime, timedelta)):
                return "'%s'" % str(value).replace("'", "''")
            elif isinstance(value, list):
                return "'{%s}'" % (",".join([self.render_array_value(x, type_.item_type) for x in value]))
            return super(LiteralCompiler, self).render_literal_value(value, type_)

    return LiteralCompiler(dialect, statement).process(statement)

Tested to two levels of nested arrays.


SQLAlchemy是否具有与Django的get_or_create等效的功能?

问题:SQLAlchemy是否具有与Django的get_or_create等效的功能?

我想从数据库中获取一个对象(如果已存在)(基于提供的参数),或者如果不存在则创建它。

Django的get_or_create(或source)做到了。SQLAlchemy中是否有等效的快捷方式?

我目前正在像这样明确地写出来:

def get_or_create_instrument(session, serial_number):
    instrument = session.query(Instrument).filter_by(serial_number=serial_number).first()
    if instrument:
        return instrument
    else:
        instrument = Instrument(serial_number)
        session.add(instrument)
        return instrument

I want to get an object from the database if it already exists (based on provided parameters) or create it if it does not.

Django’s get_or_create (or source) does this. Is there an equivalent shortcut in SQLAlchemy?

I’m currently writing it out explicitly like this:

def get_or_create_instrument(session, serial_number):
    instrument = session.query(Instrument).filter_by(serial_number=serial_number).first()
    if instrument:
        return instrument
    else:
        instrument = Instrument(serial_number)
        session.add(instrument)
        return instrument

回答 0

基本上就是这样做的方法,没有快捷方式可供使用的AFAIK。

您可以将其概括为:

def get_or_create(session, model, defaults=None, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance, False
    else:
        params = dict((k, v) for k, v in kwargs.iteritems() if not isinstance(v, ClauseElement))
        params.update(defaults or {})
        instance = model(**params)
        session.add(instance)
        return instance, True

That’s basically the way to do it, there is no shortcut readily available AFAIK.

You could generalize it ofcourse:

def get_or_create(session, model, defaults=None, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance, False
    else:
        params = dict((k, v) for k, v in kwargs.iteritems() if not isinstance(v, ClauseElement))
        params.update(defaults or {})
        instance = model(**params)
        session.add(instance)
        return instance, True

2020 update

Here is a cleaner version with Python 3.9’s the new dict union operator (|=)

def get_or_create(session, Model, defaults=None, **kwargs):
    instance = session.query(Model).filter_by(**kwargs).first()
    if instance:
        return instance
    else:
        kwargs |= defaults or {}
        instance = Model(**kwargs)
        session.add(instance)
        return instance

回答 1

在@WoLpH解决方案之后,这是对我有用的代码(简单版本):

def get_or_create(session, model, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance
    else:
        instance = model(**kwargs)
        session.add(instance)
        session.commit()
        return instance

这样,我就可以get_or_create我的模型的任何对象。

假设我的模型对象是:

class Country(Base):
    __tablename__ = 'countries'
    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True)

要获取或创建我的对象,我写:

myCountry = get_or_create(session, Country, name=countryName)

Following the solution of @WoLpH, this is the code that worked for me (simple version):

def get_or_create(session, model, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance
    else:
        instance = model(**kwargs)
        session.add(instance)
        session.commit()
        return instance

With this, I’m able to get_or_create any object of my model.

Suppose my model object is :

class Country(Base):
    __tablename__ = 'countries'
    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True)

To get or create my object I write :

myCountry = get_or_create(session, Country, name=countryName)

回答 2

我一直在解决这个问题,并最终得到了一个相当强大的解决方案:

def get_one_or_create(session,
                      model,
                      create_method='',
                      create_method_kwargs=None,
                      **kwargs):
    try:
        return session.query(model).filter_by(**kwargs).one(), False
    except NoResultFound:
        kwargs.update(create_method_kwargs or {})
        created = getattr(model, create_method, model)(**kwargs)
        try:
            session.add(created)
            session.flush()
            return created, True
        except IntegrityError:
            session.rollback()
            return session.query(model).filter_by(**kwargs).one(), False

我只是写了一篇有关所有详细信息的相当广泛的博客文章,但是对我为什么要使用它的一些颇有想法。

  1. 它解压缩到一个元组,该元组告诉您对象是否存在。这通常在您的工作流程中很有用。

  2. 该功能使您能够使用@classmethod修饰的创建者功能(以及特定于它们的属性)。

  3. 当您有多个进程连接到数据存储时,该解决方案可防止出现竞争状况。

编辑:我已经改变session.commit()session.flush()在解释这个博客帖子。请注意,这些决定特定于所使用的数据存储(在这种情况下为Postgres)。

编辑2:我已在函数中使用{}作为默认值进行了更新,因为这是典型的Python陷阱。谢谢你的评论,奈杰尔!如果您对此问题感到好奇,请查看此StackOverflow问题此博客文章

I’ve been playing with this problem and have ended up with a fairly robust solution:

def get_one_or_create(session,
                      model,
                      create_method='',
                      create_method_kwargs=None,
                      **kwargs):
    try:
        return session.query(model).filter_by(**kwargs).one(), False
    except NoResultFound:
        kwargs.update(create_method_kwargs or {})
        created = getattr(model, create_method, model)(**kwargs)
        try:
            session.add(created)
            session.flush()
            return created, True
        except IntegrityError:
            session.rollback()
            return session.query(model).filter_by(**kwargs).one(), False

I just wrote a fairly expansive blog post on all the details, but a few quite ideas of why I used this.

  1. It unpacks to a tuple that tells you if the object existed or not. This can often be useful in your workflow.

  2. The function gives the ability to work with @classmethod decorated creator functions (and attributes specific to them).

  3. The solution protects against Race Conditions when you have more than one process connected to the datastore.

EDIT: I’ve changed session.commit() to session.flush() as explained in this blog post. Note that these decisions are specific to the datastore used (Postgres in this case).

EDIT 2: I’ve updated using a {} as a default value in the function as this is typical Python gotcha. Thanks for the comment, Nigel! If your curious about this gotcha, check out this StackOverflow question and this blog post.


回答 3

埃里克出色答案的修改版

def get_one_or_create(session,
                      model,
                      create_method='',
                      create_method_kwargs=None,
                      **kwargs):
    try:
        return session.query(model).filter_by(**kwargs).one(), True
    except NoResultFound:
        kwargs.update(create_method_kwargs or {})
        try:
            with session.begin_nested():
                created = getattr(model, create_method, model)(**kwargs)
                session.add(created)
            return created, False
        except IntegrityError:
            return session.query(model).filter_by(**kwargs).one(), True
  • 使用嵌套事务仅回滚新项的添加,而不回滚所有内容(请参阅此答案以将嵌套事务与SQLite一起使用)
  • 移动create_method。如果创建的对象具有关系,并且通过这些关系为其分配了成员,则它将自动添加到会话中。例如,创建一个book具有user_iduser作为对应关系的,然后在book.user=<user object>里面做create_method将添加book到会话中。这意味着create_method必须在内部with才能从最终回滚中受益。请注意,它会begin_nested自动触发冲洗。

请注意,如果使用MySQL,则必须将事务隔离级别设置为READ COMMITTED而不是REPEATABLE READ此级别。Django的get_or_create(和此处)使用相同的策略,另请参见Django 文档

A modified version of erik’s excellent answer

def get_one_or_create(session,
                      model,
                      create_method='',
                      create_method_kwargs=None,
                      **kwargs):
    try:
        return session.query(model).filter_by(**kwargs).one(), True
    except NoResultFound:
        kwargs.update(create_method_kwargs or {})
        try:
            with session.begin_nested():
                created = getattr(model, create_method, model)(**kwargs)
                session.add(created)
            return created, False
        except IntegrityError:
            return session.query(model).filter_by(**kwargs).one(), True
  • Use a nested transaction to only roll back the addition of the new item instead of rolling back everything (See this answer to use nested transactions with SQLite)
  • Move create_method. If the created object has relations and it is assigned members through those relations, it is automatically added to the session. E.g. create a book, which has user_id and user as corresponding relationship, then doing book.user=<user object> inside of create_method will add book to the session. This means that create_method must be inside with to benefit from an eventual rollback. Note that begin_nested automatically triggers a flush.

Note that if using MySQL, the transaction isolation level must be set to READ COMMITTED rather than REPEATABLE READ for this to work. Django’s get_or_create (and here) uses the same stratagem, see also the Django documentation.


回答 4

这个SQLALchemy食谱能很好地完成工作。

首先要做的是定义一个函数,该函数被赋予要使用的Session,并将字典与Session()关联起来,以跟踪当前的唯一键。

def _unique(session, cls, hashfunc, queryfunc, constructor, arg, kw):
    cache = getattr(session, '_unique_cache', None)
    if cache is None:
        session._unique_cache = cache = {}

    key = (cls, hashfunc(*arg, **kw))
    if key in cache:
        return cache[key]
    else:
        with session.no_autoflush:
            q = session.query(cls)
            q = queryfunc(q, *arg, **kw)
            obj = q.first()
            if not obj:
                obj = constructor(*arg, **kw)
                session.add(obj)
        cache[key] = obj
        return obj

在mixin中有一个使用此功能的示例:

class UniqueMixin(object):
    @classmethod
    def unique_hash(cls, *arg, **kw):
        raise NotImplementedError()

    @classmethod
    def unique_filter(cls, query, *arg, **kw):
        raise NotImplementedError()

    @classmethod
    def as_unique(cls, session, *arg, **kw):
        return _unique(
                    session,
                    cls,
                    cls.unique_hash,
                    cls.unique_filter,
                    cls,
                    arg, kw
            )

最后创建唯一的get_or_create模型:

from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

engine = create_engine('sqlite://', echo=True)

Session = sessionmaker(bind=engine)

class Widget(UniqueMixin, Base):
    __tablename__ = 'widget'

    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True, nullable=False)

    @classmethod
    def unique_hash(cls, name):
        return name

    @classmethod
    def unique_filter(cls, query, name):
        return query.filter(Widget.name == name)

Base.metadata.create_all(engine)

session = Session()

w1, w2, w3 = Widget.as_unique(session, name='w1'), \
                Widget.as_unique(session, name='w2'), \
                Widget.as_unique(session, name='w3')
w1b = Widget.as_unique(session, name='w1')

assert w1 is w1b
assert w2 is not w3
assert w2 is not w1

session.commit()

配方更深入地介绍了这个想法,并提供了不同的方法,但是我已经成功地使用了这一方法。

This SQLALchemy recipe does the job nice and elegant.

The first thing to do is to define a function that is given a Session to work with, and associates a dictionary with the Session() which keeps track of current unique keys.

def _unique(session, cls, hashfunc, queryfunc, constructor, arg, kw):
    cache = getattr(session, '_unique_cache', None)
    if cache is None:
        session._unique_cache = cache = {}

    key = (cls, hashfunc(*arg, **kw))
    if key in cache:
        return cache[key]
    else:
        with session.no_autoflush:
            q = session.query(cls)
            q = queryfunc(q, *arg, **kw)
            obj = q.first()
            if not obj:
                obj = constructor(*arg, **kw)
                session.add(obj)
        cache[key] = obj
        return obj

An example of utilizing this function would be in a mixin:

class UniqueMixin(object):
    @classmethod
    def unique_hash(cls, *arg, **kw):
        raise NotImplementedError()

    @classmethod
    def unique_filter(cls, query, *arg, **kw):
        raise NotImplementedError()

    @classmethod
    def as_unique(cls, session, *arg, **kw):
        return _unique(
                    session,
                    cls,
                    cls.unique_hash,
                    cls.unique_filter,
                    cls,
                    arg, kw
            )

And finally creating the unique get_or_create model:

from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

engine = create_engine('sqlite://', echo=True)

Session = sessionmaker(bind=engine)

class Widget(UniqueMixin, Base):
    __tablename__ = 'widget'

    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True, nullable=False)

    @classmethod
    def unique_hash(cls, name):
        return name

    @classmethod
    def unique_filter(cls, query, name):
        return query.filter(Widget.name == name)

Base.metadata.create_all(engine)

session = Session()

w1, w2, w3 = Widget.as_unique(session, name='w1'), \
                Widget.as_unique(session, name='w2'), \
                Widget.as_unique(session, name='w3')
w1b = Widget.as_unique(session, name='w1')

assert w1 is w1b
assert w2 is not w3
assert w2 is not w1

session.commit()

The recipe goes deeper into the idea and provides different approaches but I’ve used this one with great success.


回答 5

语义上最接近的可能是:

def get_or_create(model, **kwargs):
    """SqlAlchemy implementation of Django's get_or_create.
    """
    session = Session()
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance, False
    else:
        instance = model(**kwargs)
        session.add(instance)
        session.commit()
        return instance, True

不知道如何依靠Sessionsqlalchemy中的全局定义,但是Django版本没有连接,所以…

返回的元组包含实例和一个布尔值,指示是否创建了实例(即,如果我们从数据库读取实例,则为False)。

get_or_create经常使用Django 来确保全局数据可用,因此我会尽早提交。

The closest semantically is probably:

def get_or_create(model, **kwargs):
    """SqlAlchemy implementation of Django's get_or_create.
    """
    session = Session()
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance, False
    else:
        instance = model(**kwargs)
        session.add(instance)
        session.commit()
        return instance, True

not sure how kosher it is to rely on a globally defined Session in sqlalchemy, but the Django version doesn’t take a connection so…

The tuple returned contains the instance and a boolean indicating if the instance was created (i.e. it’s False if we read the instance from the db).

Django’s get_or_create is often used to make sure that global data is available, so I’m committing at the earliest point possible.


回答 6

我稍微简化了@Kevin。解决方案,以避免将整个功能包装在if/ else语句中。这样,只有一个return,我发现它更干净:

def get_or_create(session, model, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()

    if not instance:
        instance = model(**kwargs)
        session.add(instance)

    return instance

I slightly simplified @Kevin. solution to avoid wrapping the whole function in an if/else statement. This way there’s only one return, which I find cleaner:

def get_or_create(session, model, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()

    if not instance:
        instance = model(**kwargs)
        session.add(instance)

    return instance

回答 7

根据您采用的隔离级别,以上解决方案均无效。我发现的最佳解决方案是以下形式的RAW SQL:

INSERT INTO table(f1, f2, unique_f3) 
SELECT 'v1', 'v2', 'v3' 
WHERE NOT EXISTS (SELECT 1 FROM table WHERE f3 = 'v3')

无论隔离级别和并行度如何,这在事务上都是安全的。

当心:为了使其高效,为唯一列使用INDEX是明智的。

Depending on the isolation level you adopted, none of the above solutions would work. The best solution I have found is a RAW SQL in the following form:

INSERT INTO table(f1, f2, unique_f3) 
SELECT 'v1', 'v2', 'v3' 
WHERE NOT EXISTS (SELECT 1 FROM table WHERE f3 = 'v3')

This is transactionally safe whatever the isolation level and the degree of parallelism are.

Beware: in order to make it efficient, it would be wise to have an INDEX for the unique column.