问题:如何在Alembic升级脚本中执行插入和更新?

我需要在Alembic升级期间更改数据。

我目前在第一个修订版中有一个“玩家”表:

def upgrade():
    op.create_table('player',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('name', sa.Unicode(length=200), nullable=False),
        sa.Column('position', sa.Unicode(length=200), nullable=True),
        sa.Column('team', sa.Unicode(length=100), nullable=True)
        sa.PrimaryKeyConstraint('id')
    )

我想介绍一个“团队”表。我创建了第二个修订版:

def upgrade():
    op.create_table('teams',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('name', sa.String(length=80), nullable=False)
    )
    op.add_column('players', sa.Column('team_id', sa.Integer(), nullable=False))

我希望第二次迁移也添加以下数据:

  1. 填充团队表:

    INSERT INTO teams (name) SELECT DISTINCT team FROM players;
  2. 根据players.team名称更新players.team_id:

    UPDATE players AS p JOIN teams AS t SET p.team_id = t.id WHERE p.team = t.name;

如何在升级脚本中执行插入和更新?

I need to alter data during an Alembic upgrade.

I currently have a ‘players’ table in a first revision:

def upgrade():
    op.create_table('player',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('name', sa.Unicode(length=200), nullable=False),
        sa.Column('position', sa.Unicode(length=200), nullable=True),
        sa.Column('team', sa.Unicode(length=100), nullable=True)
        sa.PrimaryKeyConstraint('id')
    )

I want to introduce a ‘teams’ table. I’ve created a second revision:

def upgrade():
    op.create_table('teams',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('name', sa.String(length=80), nullable=False)
    )
    op.add_column('players', sa.Column('team_id', sa.Integer(), nullable=False))

I would like the second migration to also add the following data:

  1. Populate teams table:

    INSERT INTO teams (name) SELECT DISTINCT team FROM players;
    
  2. Update players.team_id based on players.team name:

    UPDATE players AS p JOIN teams AS t SET p.team_id = t.id WHERE p.team = t.name;
    

How do I execute inserts and updates inside the upgrade script?


回答 0

您需要的是数据迁移,而不是Alembic文档中最普遍的模式迁移

该答案假设您使用声明式(而不是class-Mapper-Table或core)定义模型。使它适应其他形式应该相对简单。

请注意,Alembic提供了一些基本数据功能:op.bulk_insert()op.execute()。如果操作很少,请使用这些操作。如果迁移需要关系或其他复杂的交互作用,我更喜欢使用模型和会话的全部功能,如下所述。

以下是示例迁移脚本,该脚本设置了一些声明性模型,这些声明性模型将用于处理会话中的数据。关键点是:

  1. 使用所需的列定义所需的基本模型。您不需要每一列,只需要主键和将要使用的那些。

  2. 在升级功能中,用于op.get_bind()获取当前连接并与其建立会话。

    • 或者使用bind.execute()SQLAlchemy的较低级别直接编写SQL查询。这对于简单的迁移很有用。
  3. 像通常在应用程序中一样使用模型和会话。

"""create teams table

Revision ID: 169ad57156f0
Revises: 29b4c2bfce6d
Create Date: 2014-06-25 09:00:06.784170
"""

revision = '169ad57156f0'
down_revision = '29b4c2bfce6d'

from alembic import op
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class Player(Base):
    __tablename__ = 'players'

    id = sa.Column(sa.Integer, primary_key=True)
    name = sa.Column(sa.String, nullable=False)
    team_name = sa.Column('team', sa.String, nullable=False)
    team_id = sa.Column(sa.Integer, sa.ForeignKey('teams.id'), nullable=False)

    team = orm.relationship('Team', backref='players')


class Team(Base):
    __tablename__ = 'teams'

    id = sa.Column(sa.Integer, primary_key=True)
    name = sa.Column(sa.String, nullable=False, unique=True)


def upgrade():
    bind = op.get_bind()
    session = orm.Session(bind=bind)

    # create the teams table and the players.team_id column
    Team.__table__.create(bind)
    op.add_column('players', sa.Column('team_id', sa.ForeignKey('teams.id'), nullable=False)

    # create teams for each team name
    teams = {name: Team(name=name) for name in session.query(Player.team).distinct()}
    session.add_all(teams.values())

    # set player team based on team name
    for player in session.query(Player):
        player.team = teams[player.team_name]

    session.commit()

    # don't need team name now that team relationship is set
    op.drop_column('players', 'team')


def downgrade():
    bind = op.get_bind()
    session = orm.Session(bind=bind)

    # re-add the players.team column
    op.add_column('players', sa.Column('team', sa.String, nullable=False)

    # set players.team based on team relationship
    for player in session.query(Player):
        player.team_name = player.team.name

    session.commit()

    op.drop_column('players', 'team_id')
    op.drop_table('teams')

迁移定义了单独的模型,因为代码中的模型表示数据库的当前状态,而迁移表示过程中的步骤。您的数据库可能沿着该路径处于任何状态,因此模型可能尚未与数据库同步。除非您非常小心,否则直接使用真实模型会导致缺少列,无效数据等问题。更清晰地明确说明要在迁移中使用的列和模型。

What you are asking for is a data migration, as opposed to the schema migration that is most prevalent in the Alembic docs.

This answer assumes you are using declarative (as opposed to class-Mapper-Table or core) to define your models. It should be relatively straightforward to adapt this to the other forms.

Note that Alembic provides some basic data functions: op.bulk_insert() and op.execute(). If the operations are fairly minimal, use those. If the migration requires relationships or other complex interactions, I prefer to use the full power of models and sessions as described below.

The following is an example migration script that sets up some declarative models that will be used to manipulate data in a session. The key points are:

  1. Define the basic models you need, with the columns you’ll need. You don’t need every column, just the primary key and the ones you’ll be using.

  2. Within the upgrade function, use op.get_bind() to get the current connection, and make a session with it.

    • Or use bind.execute() to use SQLAlchemy’s lower level to write SQL queries directly. This is useful for simple migrations.
  3. Use the models and session as you normally would in your application.

"""create teams table

Revision ID: 169ad57156f0
Revises: 29b4c2bfce6d
Create Date: 2014-06-25 09:00:06.784170
"""

revision = '169ad57156f0'
down_revision = '29b4c2bfce6d'

from alembic import op
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class Player(Base):
    __tablename__ = 'players'

    id = sa.Column(sa.Integer, primary_key=True)
    name = sa.Column(sa.String, nullable=False)
    team_name = sa.Column('team', sa.String, nullable=False)
    team_id = sa.Column(sa.Integer, sa.ForeignKey('teams.id'), nullable=False)

    team = orm.relationship('Team', backref='players')


class Team(Base):
    __tablename__ = 'teams'

    id = sa.Column(sa.Integer, primary_key=True)
    name = sa.Column(sa.String, nullable=False, unique=True)


def upgrade():
    bind = op.get_bind()
    session = orm.Session(bind=bind)

    # create the teams table and the players.team_id column
    Team.__table__.create(bind)
    op.add_column('players', sa.Column('team_id', sa.ForeignKey('teams.id'), nullable=False)

    # create teams for each team name
    teams = {name: Team(name=name) for name in session.query(Player.team).distinct()}
    session.add_all(teams.values())

    # set player team based on team name
    for player in session.query(Player):
        player.team = teams[player.team_name]

    session.commit()

    # don't need team name now that team relationship is set
    op.drop_column('players', 'team')


def downgrade():
    bind = op.get_bind()
    session = orm.Session(bind=bind)

    # re-add the players.team column
    op.add_column('players', sa.Column('team', sa.String, nullable=False)

    # set players.team based on team relationship
    for player in session.query(Player):
        player.team_name = player.team.name

    session.commit()

    op.drop_column('players', 'team_id')
    op.drop_table('teams')

The migration defines separate models because the models in your code represent the current state of the database, while the migrations represent steps along the way. Your database might be in any state along that path, so the models might not sync up with the database yet. Unless you’re very careful, using the real models directly will cause problems with missing columns, invalid data, etc. It’s clearer to explicitly state exactly what columns and models you will use in the migration.


回答 1

您还可以使用直接SQL参见(Alembic操作参考),如以下示例所示:

from alembic import op

# revision identifiers, used by Alembic.
revision = '1ce7873ac4ced2'
down_revision = '1cea0ac4ced2'
branch_labels = None
depends_on = None


def upgrade():
    # ### commands made by andrew ###
    op.execute('UPDATE STOCK SET IN_STOCK = -1 WHERE IN_STOCK IS NULL')
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    pass
    # ### end Alembic commands ###

You can also use direct SQL see (Alembic Operation Reference) as in the following example:

from alembic import op

# revision identifiers, used by Alembic.
revision = '1ce7873ac4ced2'
down_revision = '1cea0ac4ced2'
branch_labels = None
depends_on = None


def upgrade():
    # ### commands made by andrew ###
    op.execute('UPDATE STOCK SET IN_STOCK = -1 WHERE IN_STOCK IS NULL')
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    pass
    # ### end Alembic commands ###

回答 2

我建议使用临时表来使用SQLAlchemy核心语句(如官方文档中所述),因为它允许使用不可知论的SQL和pythonic编写,并且也是独立的。对于迁移脚本,SQLAlchemy Core是两全其美的。

这是概念的示例:

from sqlalchemy.sql import table, column
from sqlalchemy import String
from alembic import op

account = table('account',
    column('name', String)
)
op.execute(
    account.update().\\
    where(account.c.name==op.inline_literal('account 1')).\\
        values({'name':op.inline_literal('account 2')})
        )

# If insert is required
from sqlalchemy.sql import insert
from sqlalchemy import orm

session = orm.Session(bind=bind)
bind = op.get_bind()

data = {
    "name": "John",
}
ret = session.execute(insert(account).values(data))
# for use in other insert calls
account_id = ret.lastrowid

I recommend using SQLAlchemy core statements using an ad-hoc table, as detailed in the official documentation, because it allows the use of agnostic SQL and pythonic writing and is also self-contained. SQLAlchemy Core is the best of both worlds for migration scripts.

Here is an example of the concept:

from sqlalchemy.sql import table, column
from sqlalchemy import String
from alembic import op

account = table('account',
    column('name', String)
)
op.execute(
    account.update().\\
    where(account.c.name==op.inline_literal('account 1')).\\
        values({'name':op.inline_literal('account 2')})
        )

# If insert is required
from sqlalchemy.sql import insert
from sqlalchemy import orm

session = orm.Session(bind=bind)
bind = op.get_bind()

data = {
    "name": "John",
}
ret = session.execute(insert(account).values(data))
# for use in other insert calls
account_id = ret.lastrowid

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。