问题:SQLAlchemy版本控制关心类的导入顺序
我在这里遵循指南:
http://www.sqlalchemy.org/docs/orm/examples.html?highlight=versioning#versioned-objects
并遇到了一个问题。我的关系定义如下:
generic_ticker = relation('MyClass', backref=backref("stuffs"))
使用字符串,因此它不在乎模型模块的导入顺序。这一切都正常工作,但是当我使用版本控制元时,出现以下错误:
sqlalchemy.exc.InvalidRequestError:初始化映射程序Mapper | MyClass | stuffs时,表达式’Trader’找不到名称(“名称’MyClass’未定义”)。如果这是一个类名,请考虑在定义了两个从属类之后,将这个Relationship()添加到类中。
我跟踪到以下错误:
File "/home/nick/workspace/gm3/gm3/lib/history_meta.py", line 90, in __init__
mapper = class_mapper(cls)
File "/home/nick/venv/tg2env/lib/python2.6/site-packages/sqlalchemy/orm/util.py", line 622, in class_mapper
mapper = mapper.compile()
class VersionedMeta(DeclarativeMeta):
def __init__(cls, classname, bases, dict_):
DeclarativeMeta.__init__(cls, classname, bases, dict_)
try:
mapper = class_mapper(cls)
_history_mapper(mapper)
except UnmappedClassError:
pass
我通过尝试一下来解决了这个问题:将所有东西都放入lambda中,然后在所有导入操作完成后再运行它们。这有效,但似乎有点垃圾,如何解决此问题的任何想法是更好的方法?
谢谢!
更新资料
问题实际上与导入顺序无关。设计版本控制示例时,使映射器需要在每个版本控制类的构造函数中进行编译。当尚未定义相关的类时,编译将失败。如果是循环关系,则无法通过更改映射类的定义顺序来使其工作。
更新2
如上述更新所述(我不知道您可以在此处编辑其他人的帖子:))这很可能是由于循环引用。在这种情况下,可能有人会发现我的黑客很有用(我将其与turbogears一起使用)(替换VersionedMeta并在history_meta中全局添加create_mappers)
create_mappers = []
class VersionedMeta(DeclarativeMeta):
def __init__(cls, classname, bases, dict_):
DeclarativeMeta.__init__(cls, classname, bases, dict_)
#I added this code in as it was crashing otherwise
def make_mapper():
try:
mapper = class_mapper(cls)
_history_mapper(mapper)
except UnmappedClassError:
pass
create_mappers.append(lambda: make_mapper())
然后,您可以在模型__init__.py中执行以下操作
# Import your model modules here.
from myproj.lib.history_meta import create_mappers
from myproj.model.misc import *
from myproj.model.actor import *
from myproj.model.stuff1 import *
from myproj.model.instrument import *
from myproj.model.stuff import *
#setup the history
[func() for func in create_mappers]
这样,仅在定义了所有类之后,它才创建映射器。
更新3 略微无关,但在某些情况下我遇到了重复的主键错误(一次对同一对象进行2次更改)。我的解决方法是添加一个新的主自动增量键。当然,使用mysql不能超过1个,因此我不得不取消对用于创建历史表的现有内容的主键。查看我的整体代码(包括hist_id并摆脱外键约束):
"""Stolen from the offical sqlalchemy recpies
"""
from sqlalchemy.ext.declarative import DeclarativeMeta
from sqlalchemy.orm import mapper, class_mapper, attributes, object_mapper
from sqlalchemy.orm.exc import UnmappedClassError, UnmappedColumnError
from sqlalchemy import Table, Column, ForeignKeyConstraint, Integer
from sqlalchemy.orm.interfaces import SessionExtension
from sqlalchemy.orm.properties import RelationshipProperty
from sqlalchemy.types import DateTime
import datetime
from sqlalchemy.orm.session import Session
def col_references_table(col, table):
for fk in col.foreign_keys:
if fk.references(table):
return True
return False
def _history_mapper(local_mapper):
cls = local_mapper.class_
# set the "active_history" flag
# on on column-mapped attributes so that the old version
# of the info is always loaded (currently sets it on all attributes)
for prop in local_mapper.iterate_properties:
getattr(local_mapper.class_, prop.key).impl.active_history = True
super_mapper = local_mapper.inherits
super_history_mapper = getattr(cls, '__history_mapper__', None)
polymorphic_on = None
super_fks = []
if not super_mapper or local_mapper.local_table is not super_mapper.local_table:
cols = []
for column in local_mapper.local_table.c:
if column.name == 'version':
continue
col = column.copy()
col.unique = False
#don't auto increment stuff from the normal db
if col.autoincrement:
col.autoincrement = False
#sqllite falls over with auto incrementing keys if we have a composite key
if col.primary_key:
col.primary_key = False
if super_mapper and col_references_table(column, super_mapper.local_table):
super_fks.append((col.key, list(super_history_mapper.base_mapper.local_table.primary_key)[0]))
cols.append(col)
if column is local_mapper.polymorphic_on:
polymorphic_on = col
#if super_mapper:
# super_fks.append(('version', super_history_mapper.base_mapper.local_table.c.version))
cols.append(Column('hist_id', Integer, primary_key=True, autoincrement=True))
cols.append(Column('version', Integer))
cols.append(Column('changed', DateTime, default=datetime.datetime.now))
if super_fks:
cols.append(ForeignKeyConstraint(*zip(*super_fks)))
table = Table(local_mapper.local_table.name + '_history', local_mapper.local_table.metadata,
*cols, mysql_engine='InnoDB')
else:
# single table inheritance. take any additional columns that may have
# been added and add them to the history table.
for column in local_mapper.local_table.c:
if column.key not in super_history_mapper.local_table.c:
col = column.copy()
super_history_mapper.local_table.append_column(col)
table = None
if super_history_mapper:
bases = (super_history_mapper.class_,)
else:
bases = local_mapper.base_mapper.class_.__bases__
versioned_cls = type.__new__(type, "%sHistory" % cls.__name__, bases, {})
m = mapper(
versioned_cls,
table,
inherits=super_history_mapper,
polymorphic_on=polymorphic_on,
polymorphic_identity=local_mapper.polymorphic_identity
)
cls.__history_mapper__ = m
if not super_history_mapper:
cls.version = Column('version', Integer, default=1, nullable=False)
create_mappers = []
class VersionedMeta(DeclarativeMeta):
def __init__(cls, classname, bases, dict_):
DeclarativeMeta.__init__(cls, classname, bases, dict_)
#I added this code in as it was crashing otherwise
def make_mapper():
try:
mapper = class_mapper(cls)
_history_mapper(mapper)
except UnmappedClassError:
pass
create_mappers.append(lambda: make_mapper())
def versioned_objects(iter):
for obj in iter:
if hasattr(obj, '__history_mapper__'):
yield obj
def create_version(obj, session, deleted = False):
obj_mapper = object_mapper(obj)
history_mapper = obj.__history_mapper__
history_cls = history_mapper.class_
obj_state = attributes.instance_state(obj)
attr = {}
obj_changed = False
for om, hm in zip(obj_mapper.iterate_to_root(), history_mapper.iterate_to_root()):
if hm.single:
continue
for hist_col in hm.local_table.c:
if hist_col.key == 'version' or hist_col.key == 'changed' or hist_col.key == 'hist_id':
continue
obj_col = om.local_table.c[hist_col.key]
# get the value of the
# attribute based on the MapperProperty related to the
# mapped column. this will allow usage of MapperProperties
# that have a different keyname than that of the mapped column.
try:
prop = obj_mapper.get_property_by_column(obj_col)
except UnmappedColumnError:
# in the case of single table inheritance, there may be
# columns on the mapped table intended for the subclass only.
# the "unmapped" status of the subclass column on the
# base class is a feature of the declarative module as of sqla 0.5.2.
continue
# expired object attributes and also deferred cols might not be in the
# dict. force it to load no matter what by using getattr().
if prop.key not in obj_state.dict:
getattr(obj, prop.key)
a, u, d = attributes.get_history(obj, prop.key)
if d:
attr[hist_col.key] = d[0]
obj_changed = True
elif u:
attr[hist_col.key] = u[0]
else:
# if the attribute had no value.
attr[hist_col.key] = a[0]
obj_changed = True
if not obj_changed:
# not changed, but we have relationships. OK
# check those too
for prop in obj_mapper.iterate_properties:
if isinstance(prop, RelationshipProperty) and \
attributes.get_history(obj, prop.key).has_changes():
obj_changed = True
break
if not obj_changed and not deleted:
return
attr['version'] = obj.version
hist = history_cls()
for key, value in attr.iteritems():
setattr(hist, key, value)
obj.version += 1
session.add(hist)
class VersionedListener(SessionExtension):
def before_flush(self, session, flush_context, instances):
for obj in versioned_objects(session.dirty):
create_version(obj, session)
for obj in versioned_objects(session.deleted):
create_version(obj, session, deleted = True)
回答 0
我通过尝试一下来解决了这个问题:将所有东西都放入lambda中,然后在所有导入操作完成后再运行它们。
大!