数据库并发问题

这是一个典型的数据库并发问题,称为**“竞态条件”,具体表现为“检查后插入”**操作的非原子性。
我们来分解一下整个过程:

  1. 根本原因psycopg.errors.UniqueViolation
    • 数据库中有一个名为 uq_person_user_name 的唯一约束,它要求 (provider, provider_id, name) 这三个字段的组合必须是唯一的。
    • 错误信息显示,你的程序试图插入一条 (tmdb, 1210414, 于海) 的记录,但这个组合已经存在于数据库中了。
  2. 竞态条件的发生场景
    • 假设有两个并发的任务(线程/进程)几乎同时处理同一个演员“于海”的信息。
    • 任务 A 执行到 session.exec(select(Person)...)。此时数据库里还没有“于海”,所以查询结果为 None
    • 任务 B 也执行到 session.exec(select(Person)...)。由于任务 A 还没有执行 INSERT,任务 B 的查询结果也为 None
    • 任务 A 判断 if not person: 为真,于是创建 Person 对象,并执行 session.add(person)session.flush()。数据库成功插入新记录。
    • 任务 B 接着也判断 if not person: 为真(因为它之前查询时也是 None),于是也创建 Person 对象,并执行 session.add(person)session.flush()
    • 此时,数据库收到了任务 B 的 INSERT 请求,发现 (tmdb, 1210414, 于海) 已经存在(被任务 A 插入了),于是抛出了 UniqueViolation 异常。
  3. 直接导致程序崩溃的原因sqlalchemy.exc.PendingRollbackError
    • UniqueViolation 异常发生时,SQLAlchemy 为了保证数据一致性,会自动将当前会话的事务标记为**“待回滚”**。
    • 你的代码没有正确处理这个异常,程序继续往下执行。
    • 当后续代码(如 media_file.full_path)再次尝试使用这个已经“损坏”的 session 与数据库交互时,SQLAlchemy 会拒绝操作并抛出 PendingRollbackError,告诉你:“这个会话的事务因为之前的错误已经回滚了,你必须先调用 Session.rollback() 才能开始新的事务。”
      总结:核心问题是“检查-然后-插入”的操作在并发环境下不是原子的,导致重复插入。而程序崩溃是因为没有处理数据库异常,导致会话状态异常。

解决方案

解决这个问题的关键是让“查找或创建”这个操作变成原子操作。这里提供两种主流且有效的解决方案。

方案一:使用数据库级别的 UPSERT (推荐)

这是最优雅、最高效的解决方案。PostgreSQL 提供了 INSERT ... ON CONFLICT 语法,可以在一个原子操作中完成“插入,如果冲突则什么都不做/更新”。
你需要修改代码,使用 SQLAlchemy 的 on_conflict_do_nothingon_conflict_do_update
修改后的代码 (_upsert_credits 部分):

from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert
# ... (在 _upsert_credits 函数内部)
# 1. 提取Person关键信息
name = self._get_attr(c, "name")
original_name = self._get_attr(c, "original_name")
provider_id = self._get_attr(c, "provider_id")
purl = self._get_attr(c, "image_url")
if not name:
    continue
# 2. 构造 INSERT ... ON CONFLICT DO NOTHING 语句
# 这会尝试插入,如果因为唯一约束冲突,则什么都不做
stmt = insert(Person).values(
    provider=provider,
    provider_id=provider_id,
    name=name,
    original_name=original_name,
    profile_url=purl
).on_conflict_do_nothing(
    index_elements=['provider', 'provider_id', 'name'] # 指定唯一约束的列
)
# 3. 执行语句并获取结果
result = session.execute(stmt)
# session.flush() 在 execute 时已经隐式执行了
# 4. 如果没有发生冲突(即插入了新数据),result.rowcount 会是 1
# 如果发生了冲突(数据已存在),result.rowcount 会是 0
# 无论哪种情况,我们都需要从数据库中获取这个 person 对象
if result.rowcount == 1:
    # 新插入的,需要重新获取以获得数据库生成的ID等信息
    person = session.exec(select(Person).where(
        Person.provider_id == provider_id,
        Person.name == name,
        Person.provider == provider
    )).first()
else:
    # 已存在的,也需要获取出来
    person = session.exec(select(Person).where(
        Person.provider_id == provider_id,
        Person.name == name,
        Person.provider == provider
    )).first()
    # 如果需要,可以在这里更新已存在记录的字段
    # 例如,更新 profile_url (如果之前是空的)
    if person and not person.profile_url and purl:
        person.profile_url = purl
        # session.add(person) # 对于已存在的对象,修改属性会自动被追踪,无需再次add
# 后续处理Credit...

优点

  • 原子性:完全避免了竞态条件。
  • 高效:通常只需一次数据库往返。
  • 代码清晰:意图明确,是处理此类问题的标准做法。

方案二:捕获异常并重试(“先插入,后查询”模式)

这是一种更通用的模式,不依赖特定数据库特性,但会有额外的数据库开销。
修改后的代码 (_upsert_credits 部分):

from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
# ... (在 _upsert_credits 函数内部)
# 1. 提取Person关键信息
name = self._get_attr(c, "name")
original_name = self._get_attr(c, "original_name")
provider_id = self._get_attr(c, "provider_id")
purl = self._get_attr(c, "image_url")
if not name:
    continue
# 2. 先尝试查询
person = session.exec(select(Person).where(
    Person.provider_id == provider_id,
    Person.name == name,
    Person.provider == provider
)).first()
# 3. 如果查询不到,再尝试插入
if not person:
    new_person = Person(
        provider=provider,
        provider_id=provider_id,
        name=name,
        original_name=original_name,
        profile_url=purl
    )
    try:
        session.add(new_person)
        session.flush()  # 尝试插入
        person = new_person # 插入成功,使用新对象
    except IntegrityError: # 捕获唯一约束违反的错误
        session.rollback() # **关键:必须回滚这个失败的事务**
        # 发生冲突,说明别的线程已经插入了,现在重新查询一次
        person = session.exec(select(Person).where(
            Person.provider_id == provider_id,
            Person.name == name,
            Person.provider == provider
        )).first()
        # 此时 person 一定能找到
        if person and not person.profile_url and purl:
            person.profile_url = purl
# 后续处理Credit...

优点

  • 通用性强:适用于任何支持唯一约束的数据库。
  • 逻辑直观:容易理解。
    缺点
  • 性能稍差:在并发冲突时,会产生额外的数据库操作(一次失败的 INSERT + 一次 ROLLBACK + 一次 SELECT)。
  • 必须正确处理 session.rollback():否则会继续遇到 PendingRollbackError

关于 PendingRollbackError 的额外修复

无论你选择哪种方案,你的上层调用代码都应该能妥善处理数据库异常。从堆栈跟踪看,错误发生在 persist_worker 函数中。你应该在这个函数的顶层加上一个 try...except 块,确保任何未捕获的数据库异常都能正确回滚会话。
示例 (persist_worker 函数修改):

# in services/task/consumers.py
def persist_worker(self, ...):
    # 获取 session
    with get_session() as session: # 假设你使用了类似 context manager 的方式管理 session
        try:
            svc.apply_metadata(session, media_file, metadata=contract_payload, ...)
            # 如果一切顺利,提交事务
            session.commit()
        except Exception as e:
            # 发生任何异常,都回滚事务
            logger.error(f"持久化任务 {task_id} 执行失败: {e}", exc_info=True)
            session.rollback()
            # 可以选择重新抛出异常或根据业务逻辑处理
            # raise

总结建议

  • 首选方案一 (UPSERT),它是最现代、最可靠的解决方案。
  • 如果项目环境限制或不想使用特定数据库方言,方案二(捕获异常) 也是一个可行的、被广泛使用的备选方案。
  • 务必在更高层级添加 try...except 块来管理事务的 commitrollback,这是保证数据一致性和避免 PendingRollbackError 的根本。

选择方案一:

修改后的代码:

from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert # <-- 关键导入


def _upsert_credits(self, session, user_id: int, core_id: int, credits, provider: Optional[str]) -> None:
    """
    使用数据库级别的 UPSERT 操作来原子化地处理 Person 和 Credit 的创建/更新,
    彻底解决并发环境下的竞态条件问题。
    """
    if not credits:
        return

    for c in credits:
        try:
            if not c:
                continue

            # --- 1. 处理 Person ---
            name = self._get_attr(c, "name")
            original_name = self._get_attr(c, "original_name")
            provider_id = self._get_attr(c, "provider_id")
            purl = self._get_attr(c, "image_url")
            if not name:
                continue

            # 使用 UPSERT (INSERT ... ON CONFLICT DO NOTHING) 来原子化地创建 Person
            # 这会尝试插入,如果 (provider, provider_id, name) 冲突,则什么都不做
            stmt_person = insert(Person).values(
                provider=provider,
                provider_id=provider_id,
                name=name,
                original_name=original_name,
                profile_url=purl
            ).on_conflict_do_nothing(
                index_elements=['provider', 'provider_id', 'name']  # 指定唯一约束的列
            )
            session.execute(stmt_person)

            # 无论是否插入了新数据,都重新查询一次以确保获取到 person 对象
            person = session.exec(select(Person).where(
                Person.provider_id == provider_id,
                Person.name == name,
                Person.provider == provider
            )).first()

            # 如果此时 person 仍然为 None,说明有其他严重问题(如 provider_id 为 None)
            if not person:
                logger.error(f"UPSERT Person 后仍无法获取到记录 (name={name}, provider={provider}, provider_id={provider_id})")
                continue

            # 更新可能缺失的字段 (例如,profile_url)
            # 这部分逻辑在 person 对象获取后执行,确保总是作用于一个有效的实例
            person.original_name = original_name or person.original_name
            if not person.profile_url and purl:
                person.profile_url = purl

            # --- 2. 处理 Credit ---
            # 处理 Enum 类型
            c_type = self._get_attr(c, "type")
            if hasattr(c_type, "value"):
                role_type = c_type.value
            else:
                role_type = c_type
            role = "cast" if role_type == "actor" else "crew"
            role = "guest" if self._get_attr(c, "is_flying") else role

            character = self._get_attr(c, "character") if role == "cast" else None
            job = role_type
            order = self._get_attr(c, "order")

            # 使用 UPSERT (INSERT ... ON CONFLICT DO UPDATE) 来原子化地创建/更新 Credit
            # 如果 (user_id, core_id, person_id, role, job) 冲突,则更新 character 和 order
            stmt_credit = insert(Credit).values(
                user_id=user_id,
                core_id=core_id,
                person_id=person.id,
                role=role,
                character=character,
                job=job,
                order=order
            ).on_conflict_do_update(
                index_elements=['user_id', 'core_id', 'person_id', 'role', 'job'], # 假设这是 Credit 表的唯一/候选键
                set_={
                    'character': character,
                    'order': order
                }
            )
            session.execute(stmt_credit)

        except Exception as e:
            logger.error(f"处理 Person/Credit 失败(name={name if 'name' in locals() else 'N/A'}, provider={provider}): {str(e)}", exc_info=True)
            # 关键:发生任何异常时回滚当前事务,避免会话状态污染
            session.rollback()
            continue  # 跳过当前记录,处理下一条

二、问题 2:所有关联表报 “core_id=2 不存在于 media_core”

错误根源

日志显示 “电影中更新了 media_file.core_id: 2”,但后续关联表插入时core_id=2不存在,核心原因是:

MediaCore的插入被后续的session.rollback()回滚了

具体流程:

  1. _apply_movie_detail中,插入MediaCore(core_id=2)并执行session.flush()(仅将数据写入数据库内存,未提交);
  2. 后续调用_upsert_artworks处理图片时,因问题 1 的 UPSERT 错误触发session.rollback()
  3. rollback()会撤销当前事务中所有未提交的操作(包括之前flush()MediaCore记录);
  4. 后续_upsert_credits_upsert_genres等方法使用core_id=2插入关联表时,因media_core表中无此记录,触发外键约束错误。

解决方案:调整事务回滚策略,确保 MediaCore 不被回滚

当前代码在循环处理Artwork/Credit时,一旦单个记录报错就调用session.rollback(),会导致整个事务的所有操作(包括之前的 MediaCore 插入)被撤销。需修改回滚逻辑,避免 “局部错误影响全局”。

步骤 1:修改_upsert_artworks等方法的回滚策略

删除循环内的session.rollback(),改用 “保存点(Savepoint)” 实现局部回滚(仅回滚当前错误的记录,不影响之前的 MediaCore 插入)。

_upsert_artworks为例,修改后代码如下:

def _upsert_artworks(self, session, user_id: int, core_id: int, provider: Optional[str], artworks) -> None:
    if not artworks:
        return

    for artwork in artworks:
        # 1. 创建保存点:用于局部回滚
        savepoint = session.begin_nested()
        try:
            # (原有提取属性、判断逻辑不变)
            a_type = self._get_attr(artwork, "type")
            a_url = self._get_attr(artwork, "url")
            # ... 省略其他属性提取 ...

            if a_preferred:
                # (原有UPSERT逻辑不变)
                stmt = insert(Artwork).values(...).on_conflict_do_update(...)
                result = session.execute(stmt)
                # ... 省略后续更新其他Artwork的逻辑 ...
            else:
                # (原有非首选图片UPSERT逻辑不变)
                stmt = insert(Artwork).values(...)
                session.execute(stmt)

            # 2. 无错误则释放保存点(提交局部操作)
            savepoint.commit()
        except Exception as e:
            logger.error(f"处理 Artwork 失败(URL: {a_url if 'a_url' in locals() else 'N/A'}): {str(e)}", exc_info=True)
            # 3. 仅回滚当前保存点的操作(不影响MediaCore)
            savepoint.rollback()
            continue
步骤 2:验证 MediaCore 插入的完整性

确保_apply_movie_detailMediaCore的插入无隐藏错误(如必填字段缺失导致插入失败):

  1. 检查MediaCore模型的必填字段(如user_idkindtitle)是否在插入时都有值;

  2. session.add(core)

    后添加日志,确认

    core.id

    已生成(

    flush()

    后应能获取

    core.id

    ):

    # 在_apply_movie_detail的session.add(core)和session.flush()后添加
    session.add(core)
    session.flush()
    logger.info(f"MediaCore插入成功:core_id={core.id}, user_id={media_file.user_id}, title={core.title}")
步骤 3:关联表操作前增加 core_id 有效性检查(可选)

为避免无效core_id触发外键错误,可在_upsert_credits_upsert_genres等方法开头,增加MediaCore存在性检查:

def _upsert_credits(self, session, user_id: int, core_id: int, credits, provider: Optional[str]) -> None:
    if not credits:
        return
    # 增加core_id有效性检查
    core_exists = session.exec(select(MediaCore.id).where(
        MediaCore.id == core_id, 
        MediaCore.user_id == user_id
    )).first()
    if not core_exists:
        logger.error(f"core_id={core_id} 不存在于media_core表(user_id={user_id}),跳过Credits处理")
        return
    # (原有处理Person和Credit的逻辑不变)

三、最终验证步骤

  1. 执行 SQL 为Artwork表添加复合唯一索引;

  2. 修改所有_upsert_*方法的回滚逻辑为 “保存点局部回滚”;

  3. 重新运行任务,观察日志:

    • 确认MediaCore插入成功(日志显示core_id=xx);
    • 确认Artwork的 UPSERT 不再报唯一约束错误;
    • 确认CreditMediaCoreGenre等关联表不再报外键错误。

通过以上修改,可解决当前的两个核心错误,同时保证并发场景下数据的一致性(UPSERT 的原子性)和事务的安全性(局部错误不影响全局)。


文章作者: Mealsee
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Mealsee !
 上一篇
2025-12-21 Mealsee
下一篇 
2025-12-06 Mealsee
  目录