数据库并发问题
这是一个典型的数据库并发问题,称为**“竞态条件”,具体表现为“检查后插入”**操作的非原子性。
我们来分解一下整个过程:
- 根本原因:
psycopg.errors.UniqueViolation- 数据库中有一个名为
uq_person_user_name的唯一约束,它要求(provider, provider_id, name)这三个字段的组合必须是唯一的。 - 错误信息显示,你的程序试图插入一条
(tmdb, 1210414, 于海)的记录,但这个组合已经存在于数据库中了。
- 数据库中有一个名为
- 竞态条件的发生场景:
- 假设有两个并发的任务(线程/进程)几乎同时处理同一个演员“于海”的信息。
- 任务 A 执行到
session.exec(select(Person)...)。此时数据库里还没有“于海”,所以查询结果为None。 - 任务 B 也执行到
session.exec(select(Person)...)。由于任务 A 还没有执行INSERT,任务 B 的查询结果也为None。 - 任务 A 判断
if not person:为真,于是创建Person对象,并执行session.add(person)和session.flush()。数据库成功插入新记录。 - 任务 B 接着也判断
if not person:为真(因为它之前查询时也是None),于是也创建Person对象,并执行session.add(person)和session.flush()。 - 此时,数据库收到了任务 B 的
INSERT请求,发现(tmdb, 1210414, 于海)已经存在(被任务 A 插入了),于是抛出了UniqueViolation异常。
- 直接导致程序崩溃的原因:
sqlalchemy.exc.PendingRollbackError- 当
UniqueViolation异常发生时,SQLAlchemy 为了保证数据一致性,会自动将当前会话的事务标记为**“待回滚”**。 - 你的代码没有正确处理这个异常,程序继续往下执行。
- 当后续代码(如
media_file.full_path)再次尝试使用这个已经“损坏”的session与数据库交互时,SQLAlchemy 会拒绝操作并抛出PendingRollbackError,告诉你:“这个会话的事务因为之前的错误已经回滚了,你必须先调用Session.rollback()才能开始新的事务。”
总结:核心问题是“检查-然后-插入”的操作在并发环境下不是原子的,导致重复插入。而程序崩溃是因为没有处理数据库异常,导致会话状态异常。
- 当
解决方案
解决这个问题的关键是让“查找或创建”这个操作变成原子操作。这里提供两种主流且有效的解决方案。
方案一:使用数据库级别的 UPSERT (推荐)
这是最优雅、最高效的解决方案。PostgreSQL 提供了 INSERT ... ON CONFLICT 语法,可以在一个原子操作中完成“插入,如果冲突则什么都不做/更新”。
你需要修改代码,使用 SQLAlchemy 的 on_conflict_do_nothing 或 on_conflict_do_update。
修改后的代码 (_upsert_credits 部分):
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert
# ... (在 _upsert_credits 函数内部)
# 1. 提取Person关键信息
name = self._get_attr(c, "name")
original_name = self._get_attr(c, "original_name")
provider_id = self._get_attr(c, "provider_id")
purl = self._get_attr(c, "image_url")
if not name:
continue
# 2. 构造 INSERT ... ON CONFLICT DO NOTHING 语句
# 这会尝试插入,如果因为唯一约束冲突,则什么都不做
stmt = insert(Person).values(
provider=provider,
provider_id=provider_id,
name=name,
original_name=original_name,
profile_url=purl
).on_conflict_do_nothing(
index_elements=['provider', 'provider_id', 'name'] # 指定唯一约束的列
)
# 3. 执行语句并获取结果
result = session.execute(stmt)
# session.flush() 在 execute 时已经隐式执行了
# 4. 如果没有发生冲突(即插入了新数据),result.rowcount 会是 1
# 如果发生了冲突(数据已存在),result.rowcount 会是 0
# 无论哪种情况,我们都需要从数据库中获取这个 person 对象
if result.rowcount == 1:
# 新插入的,需要重新获取以获得数据库生成的ID等信息
person = session.exec(select(Person).where(
Person.provider_id == provider_id,
Person.name == name,
Person.provider == provider
)).first()
else:
# 已存在的,也需要获取出来
person = session.exec(select(Person).where(
Person.provider_id == provider_id,
Person.name == name,
Person.provider == provider
)).first()
# 如果需要,可以在这里更新已存在记录的字段
# 例如,更新 profile_url (如果之前是空的)
if person and not person.profile_url and purl:
person.profile_url = purl
# session.add(person) # 对于已存在的对象,修改属性会自动被追踪,无需再次add
# 后续处理Credit...
优点:
- 原子性:完全避免了竞态条件。
- 高效:通常只需一次数据库往返。
- 代码清晰:意图明确,是处理此类问题的标准做法。
方案二:捕获异常并重试(“先插入,后查询”模式)
这是一种更通用的模式,不依赖特定数据库特性,但会有额外的数据库开销。
修改后的代码 (_upsert_credits 部分):
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
# ... (在 _upsert_credits 函数内部)
# 1. 提取Person关键信息
name = self._get_attr(c, "name")
original_name = self._get_attr(c, "original_name")
provider_id = self._get_attr(c, "provider_id")
purl = self._get_attr(c, "image_url")
if not name:
continue
# 2. 先尝试查询
person = session.exec(select(Person).where(
Person.provider_id == provider_id,
Person.name == name,
Person.provider == provider
)).first()
# 3. 如果查询不到,再尝试插入
if not person:
new_person = Person(
provider=provider,
provider_id=provider_id,
name=name,
original_name=original_name,
profile_url=purl
)
try:
session.add(new_person)
session.flush() # 尝试插入
person = new_person # 插入成功,使用新对象
except IntegrityError: # 捕获唯一约束违反的错误
session.rollback() # **关键:必须回滚这个失败的事务**
# 发生冲突,说明别的线程已经插入了,现在重新查询一次
person = session.exec(select(Person).where(
Person.provider_id == provider_id,
Person.name == name,
Person.provider == provider
)).first()
# 此时 person 一定能找到
if person and not person.profile_url and purl:
person.profile_url = purl
# 后续处理Credit...
优点:
- 通用性强:适用于任何支持唯一约束的数据库。
- 逻辑直观:容易理解。
缺点: - 性能稍差:在并发冲突时,会产生额外的数据库操作(一次失败的
INSERT+ 一次ROLLBACK+ 一次SELECT)。 - 必须正确处理
session.rollback():否则会继续遇到PendingRollbackError。
关于 PendingRollbackError 的额外修复
无论你选择哪种方案,你的上层调用代码都应该能妥善处理数据库异常。从堆栈跟踪看,错误发生在 persist_worker 函数中。你应该在这个函数的顶层加上一个 try...except 块,确保任何未捕获的数据库异常都能正确回滚会话。
示例 (persist_worker 函数修改):
# in services/task/consumers.py
def persist_worker(self, ...):
# 获取 session
with get_session() as session: # 假设你使用了类似 context manager 的方式管理 session
try:
svc.apply_metadata(session, media_file, metadata=contract_payload, ...)
# 如果一切顺利,提交事务
session.commit()
except Exception as e:
# 发生任何异常,都回滚事务
logger.error(f"持久化任务 {task_id} 执行失败: {e}", exc_info=True)
session.rollback()
# 可以选择重新抛出异常或根据业务逻辑处理
# raise
总结建议:
- 首选方案一 (
UPSERT),它是最现代、最可靠的解决方案。 - 如果项目环境限制或不想使用特定数据库方言,方案二(捕获异常) 也是一个可行的、被广泛使用的备选方案。
- 务必在更高层级添加
try...except块来管理事务的commit和rollback,这是保证数据一致性和避免PendingRollbackError的根本。
选择方案一:
修改后的代码:
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert # <-- 关键导入
def _upsert_credits(self, session, user_id: int, core_id: int, credits, provider: Optional[str]) -> None:
"""
使用数据库级别的 UPSERT 操作来原子化地处理 Person 和 Credit 的创建/更新,
彻底解决并发环境下的竞态条件问题。
"""
if not credits:
return
for c in credits:
try:
if not c:
continue
# --- 1. 处理 Person ---
name = self._get_attr(c, "name")
original_name = self._get_attr(c, "original_name")
provider_id = self._get_attr(c, "provider_id")
purl = self._get_attr(c, "image_url")
if not name:
continue
# 使用 UPSERT (INSERT ... ON CONFLICT DO NOTHING) 来原子化地创建 Person
# 这会尝试插入,如果 (provider, provider_id, name) 冲突,则什么都不做
stmt_person = insert(Person).values(
provider=provider,
provider_id=provider_id,
name=name,
original_name=original_name,
profile_url=purl
).on_conflict_do_nothing(
index_elements=['provider', 'provider_id', 'name'] # 指定唯一约束的列
)
session.execute(stmt_person)
# 无论是否插入了新数据,都重新查询一次以确保获取到 person 对象
person = session.exec(select(Person).where(
Person.provider_id == provider_id,
Person.name == name,
Person.provider == provider
)).first()
# 如果此时 person 仍然为 None,说明有其他严重问题(如 provider_id 为 None)
if not person:
logger.error(f"UPSERT Person 后仍无法获取到记录 (name={name}, provider={provider}, provider_id={provider_id})")
continue
# 更新可能缺失的字段 (例如,profile_url)
# 这部分逻辑在 person 对象获取后执行,确保总是作用于一个有效的实例
person.original_name = original_name or person.original_name
if not person.profile_url and purl:
person.profile_url = purl
# --- 2. 处理 Credit ---
# 处理 Enum 类型
c_type = self._get_attr(c, "type")
if hasattr(c_type, "value"):
role_type = c_type.value
else:
role_type = c_type
role = "cast" if role_type == "actor" else "crew"
role = "guest" if self._get_attr(c, "is_flying") else role
character = self._get_attr(c, "character") if role == "cast" else None
job = role_type
order = self._get_attr(c, "order")
# 使用 UPSERT (INSERT ... ON CONFLICT DO UPDATE) 来原子化地创建/更新 Credit
# 如果 (user_id, core_id, person_id, role, job) 冲突,则更新 character 和 order
stmt_credit = insert(Credit).values(
user_id=user_id,
core_id=core_id,
person_id=person.id,
role=role,
character=character,
job=job,
order=order
).on_conflict_do_update(
index_elements=['user_id', 'core_id', 'person_id', 'role', 'job'], # 假设这是 Credit 表的唯一/候选键
set_={
'character': character,
'order': order
}
)
session.execute(stmt_credit)
except Exception as e:
logger.error(f"处理 Person/Credit 失败(name={name if 'name' in locals() else 'N/A'}, provider={provider}): {str(e)}", exc_info=True)
# 关键:发生任何异常时回滚当前事务,避免会话状态污染
session.rollback()
continue # 跳过当前记录,处理下一条
二、问题 2:所有关联表报 “core_id=2 不存在于 media_core”
错误根源
日志显示 “电影中更新了 media_file.core_id: 2”,但后续关联表插入时core_id=2不存在,核心原因是:
MediaCore的插入被后续的session.rollback()回滚了。
具体流程:
- 在
_apply_movie_detail中,插入MediaCore(core_id=2)并执行session.flush()(仅将数据写入数据库内存,未提交); - 后续调用
_upsert_artworks处理图片时,因问题 1 的 UPSERT 错误触发session.rollback(); rollback()会撤销当前事务中所有未提交的操作(包括之前flush()的MediaCore记录);- 后续
_upsert_credits、_upsert_genres等方法使用core_id=2插入关联表时,因media_core表中无此记录,触发外键约束错误。
解决方案:调整事务回滚策略,确保 MediaCore 不被回滚
当前代码在循环处理Artwork/Credit时,一旦单个记录报错就调用session.rollback(),会导致整个事务的所有操作(包括之前的 MediaCore 插入)被撤销。需修改回滚逻辑,避免 “局部错误影响全局”。
步骤 1:修改_upsert_artworks等方法的回滚策略
删除循环内的session.rollback(),改用 “保存点(Savepoint)” 实现局部回滚(仅回滚当前错误的记录,不影响之前的 MediaCore 插入)。
以_upsert_artworks为例,修改后代码如下:
def _upsert_artworks(self, session, user_id: int, core_id: int, provider: Optional[str], artworks) -> None:
if not artworks:
return
for artwork in artworks:
# 1. 创建保存点:用于局部回滚
savepoint = session.begin_nested()
try:
# (原有提取属性、判断逻辑不变)
a_type = self._get_attr(artwork, "type")
a_url = self._get_attr(artwork, "url")
# ... 省略其他属性提取 ...
if a_preferred:
# (原有UPSERT逻辑不变)
stmt = insert(Artwork).values(...).on_conflict_do_update(...)
result = session.execute(stmt)
# ... 省略后续更新其他Artwork的逻辑 ...
else:
# (原有非首选图片UPSERT逻辑不变)
stmt = insert(Artwork).values(...)
session.execute(stmt)
# 2. 无错误则释放保存点(提交局部操作)
savepoint.commit()
except Exception as e:
logger.error(f"处理 Artwork 失败(URL: {a_url if 'a_url' in locals() else 'N/A'}): {str(e)}", exc_info=True)
# 3. 仅回滚当前保存点的操作(不影响MediaCore)
savepoint.rollback()
continue
步骤 2:验证 MediaCore 插入的完整性
确保_apply_movie_detail中MediaCore的插入无隐藏错误(如必填字段缺失导致插入失败):
检查
MediaCore模型的必填字段(如user_id、kind、title)是否在插入时都有值;在
session.add(core)后添加日志,确认
core.id已生成(
flush()后应能获取
core.id):
# 在_apply_movie_detail的session.add(core)和session.flush()后添加 session.add(core) session.flush() logger.info(f"MediaCore插入成功:core_id={core.id}, user_id={media_file.user_id}, title={core.title}")
步骤 3:关联表操作前增加 core_id 有效性检查(可选)
为避免无效core_id触发外键错误,可在_upsert_credits、_upsert_genres等方法开头,增加MediaCore存在性检查:
def _upsert_credits(self, session, user_id: int, core_id: int, credits, provider: Optional[str]) -> None:
if not credits:
return
# 增加core_id有效性检查
core_exists = session.exec(select(MediaCore.id).where(
MediaCore.id == core_id,
MediaCore.user_id == user_id
)).first()
if not core_exists:
logger.error(f"core_id={core_id} 不存在于media_core表(user_id={user_id}),跳过Credits处理")
return
# (原有处理Person和Credit的逻辑不变)
三、最终验证步骤
执行 SQL 为
Artwork表添加复合唯一索引;修改所有
_upsert_*方法的回滚逻辑为 “保存点局部回滚”;重新运行任务,观察日志:
- 确认
MediaCore插入成功(日志显示core_id=xx); - 确认
Artwork的 UPSERT 不再报唯一约束错误; - 确认
Credit、MediaCoreGenre等关联表不再报外键错误。
- 确认
通过以上修改,可解决当前的两个核心错误,同时保证并发场景下数据的一致性(UPSERT 的原子性)和事务的安全性(局部错误不影响全局)。