FastAPI + Dramatiq + Redis任务队列
单文件整个骨架
以下是 单文件可运行 的 scan 任务队列代码(FastAPI + Dramatiq + Redis),包含生产者、消费者、状态存储,直接运行即可测试,排除所有复杂依赖和模块问题:
from __future__ import annotations
import asyncio
import json
import uuid
import time
from datetime import datetime
from typing import Dict, Optional, List
from enum import Enum
from pydantic import BaseModel
from fastapi import FastAPI, HTTPException
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.middleware import Retries, TimeLimit
import redis
# --------------------------
# 1. 基础配置(Redis 连接 + 全局初始化)
# --------------------------
REDIS_URL = "redis://:redis123@localhost:9001/0" # 你的 Redis 配置
app = FastAPI(title="Scan 任务队列 Demo")
# 初始化 Redis Broker(任务队列)
broker = RedisBroker(
url=REDIS_URL,
middleware=[Retries(max_retries=2), TimeLimit(time_limit=60000)] # 1分钟超时
)
dramatiq.set_broker(broker)
# 初始化 Redis 客户端(存储任务状态)
redis_client = redis.from_url(REDIS_URL, decode_responses=True)
# --------------------------
# 2. 任务状态和模型定义
# --------------------------
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
class TaskPriority(int, Enum):
NORMAL = 50
class ScanPayload(BaseModel):
user_id: str
storage_id: str
scan_path: str
class TaskResponse(BaseModel):
task_id: str
status: TaskStatus
created_at: str
class TaskStatusResponse(BaseModel):
task_id: str
status: TaskStatus
payload: Dict
created_at: str
started_at: Optional[str] = None
finished_at: Optional[str] = None
error: Optional[str] = None
# --------------------------
# 3. 任务状态存储(Redis 实现)
# --------------------------
TASK_PREFIX = "scan_task:"
def _new_task_id() -> str:
return str(uuid.uuid4())
def _now_iso() -> str:
return datetime.utcnow().isoformat() + "Z"
async def init_task(task_id: str, payload: Dict) -> None:
"""初始化任务状态到 Redis"""
mapping = {
"task_id": task_id,
"status": TaskStatus.PENDING.value,
"payload": json.dumps(payload),
"created_at": _now_iso(),
"started_at": "",
"finished_at": "",
"error": ""
}
redis_client.hset(f"{TASK_PREFIX}{task_id}", mapping=mapping)
async def update_task_status(task_id: str, status: TaskStatus, **kwargs) -> None:
"""更新任务状态"""
key = f"{TASK_PREFIX}{task_id}"
mapping = {"status": status.value}
mapping.update(kwargs)
if "started_at" not in mapping and status == TaskStatus.RUNNING:
mapping["started_at"] = _now_iso()
if "finished_at" not in mapping and status in [TaskStatus.SUCCESS, TaskStatus.FAILED]:
mapping["finished_at"] = _now_iso()
redis_client.hset(key, mapping=mapping)
async def get_task(task_id: str) -> Optional[Dict]:
"""获取任务状态"""
key = f"{TASK_PREFIX}{task_id}"
data = redis_client.hgetall(key)
if not data:
return None
# 解析 payload
if data["payload"]:
data["payload"] = json.loads(data["payload"])
# 处理空字符串为 None
for k, v in data.items():
if v == "":
data[k] = None
return data
# --------------------------
# 4. Dramatiq 消费者(Scan 任务执行)
# --------------------------
@dramatiq.actor(queue_name="scan", broker=broker)
def scan_worker(task_id: str, payload: Dict) -> None:
"""Scan 任务执行逻辑(修复事件循环问题)"""
print(f"\n✅ 接收到扫描任务:task_id={task_id}, payload={payload}")
# 关键修复:强制创建并绑定事件循环到当前线程
try:
# 尝试获取当前线程的事件循环
loop = asyncio.get_event_loop()
except RuntimeError:
# 没有则创建新循环,并绑定到当前线程
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# 更新状态为运行中(现在循环已存在,不会报错)
loop.run_until_complete(update_task_status(task_id, TaskStatus.RUNNING))
print(f"📌 任务 {task_id} 开始执行:扫描路径 {payload['scan_path']}")
# 模拟扫描逻辑(5秒耗时)
time.sleep(5) # 替换为你的真实扫描逻辑(如 eng.scan_storage)
print(f"✅ 任务 {task_id} 执行成功:模拟扫描完成")
# 更新状态为成功
loop.run_until_complete(update_task_status(task_id, TaskStatus.SUCCESS))
except Exception as e:
error_msg = str(e)
print(f"❌ 任务 {task_id} 执行失败:{error_msg}")
# 更新状态为失败
loop.run_until_complete(
update_task_status(task_id, TaskStatus.FAILED, error=error_msg)
)
raise # 触发重试(可选)
# --------------------------
# 5. FastAPI 生产者(创建 Scan 任务)
# --------------------------
@app.post("/tasks/scan", response_model=TaskResponse, summary="创建扫描任务")
async def create_scan_task(payload: ScanPayload):
"""创建并提交扫描任务"""
task_id = _new_task_id()
payload_dict = payload.model_dump()
# 初始化任务状态
await init_task(task_id, payload_dict)
# 提交任务到队列
scan_worker.send(task_id, payload_dict)
print(f"📤 扫描任务已提交:task_id={task_id}, payload={payload_dict}")
return TaskResponse(
task_id=task_id,
status=TaskStatus.PENDING,
created_at=_now_iso()
)
@app.get("/tasks/{task_id}", response_model=TaskStatusResponse, summary="查询任务状态")
async def get_task_status(task_id: str):
"""查询指定任务的执行状态"""
task = await get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail=f"任务 {task_id} 不存在")
return TaskStatusResponse(
task_id=task["task_id"],
status=TaskStatus(task["status"]),
payload=task["payload"],
created_at=task["created_at"],
started_at=task["started_at"],
finished_at=task["finished_at"],
error=task["error"]
)
# --------------------------
# 6. 运行入口
# --------------------------
if __name__ == "__main__":
import uvicorn
# 启动 FastAPI 服务(生产者)
uvicorn.run("scan_task_demo:app", host="0.0.0.0", port=8000, reload=True)
运行步骤(3 步搞定)
1. 安装依赖(若未安装)
pip install fastapi uvicorn dramatiq redis pydantic
2. 启动 Redis(已确认正常)
确保你的 Redis 服务在 localhost:9001 运行,密码 redis123(和代码中 REDIS_URL 一致)。
3. 启动服务和消费者
终端 1:启动 FastAPI 生产者
python scan_task_demo.py
- 服务启动在
http://localhost:8000 - 访问
http://localhost:8000/docs可看到接口文档
终端 2:启动 Dramatiq 消费者
dramatiq scan_task_demo:scan_worker
成功启动日志:
[INFO] Dramatiq '2.0.0' is booting up. [INFO] Using broker: RedisBroker(url=redis://:redis123@localhost:9001/0) [INFO] Declared queue 'scan' [INFO] Starting 4 worker processes. [INFO] Worker process is ready for action.
测试流程
访问
http://localhost:8000/docs→ 找到/tasks/scan接口 →Try it out。输入请求参数(示例):
{ "user_id": "user_123", "storage_id": "storage_456", "scan_path": "/media/files" }点击
Execute→ 得到task_id(如a1b2c3d4-...)。查看终端 2(消费者):会打印
接收到扫描任务→ 等待 5 秒 → 打印执行成功。用
/tasks/{task_id}接口查询状态:会显示status: success。
预期效果
终端 1(生产者):打印
📤 扫描任务已提交。终端 2(消费者):打印
✅ 接收到扫描任务→ 5 秒后 →✅ 执行成功。Redis 中能查到任务状态:
redis-cli -h localhost -p 9001 -a redis123 -n 0 hgetall "scan_task:{你的task_id}"
多文件结构
多文件(生产者 / 消费者分开、多队列), producer.py , consumers.py , broker.py
我遇到的问题:
消费者不执行任务,核心原因是 「Broker 实例不一致」+「事件循环错误」+「潜在的模块导入 / 队列声明问题」
一、最核心问题:Broker 实例不一致(多文件拆分的致命坑)
这是导致任务「入队成功但消费者看不到」的根本原因,也是多文件拆分时最容易踩的坑。
问题原理
- 你的
broker.py虽然用了单例模式(_broker_instance),但 Python 多模块导入时,可能会创建多个 Broker 实例(比如producer.py和consumers.py各自导入broker.py,若导入时机不同或存在循环导入,可能导致单例失效)。 - 生产者用「实例 A」发送任务到 Redis 队列,消费者用「实例 B」监听队列,两者看似配置相同,但本质是两个独立的 Broker 实例,消费者自然看不到生产者发送的任务。
关键证据
你之前执行 redis-cli keys "dramatiq:*" 时,能看到 dramatiq:scan 队列(生产者成功入队),但消费者启动后没有消费,说明消费者的 Broker 实例没有监听这个队列(或监听了其他地方)。
修复方案:强制全局统一 Broker 实例
- 修改
broker.py:确保全局唯一实例,且自动注册到 Dramatiq
from dramatiq import set_broker # 新增:全局注册 Broker
from dramatiq.brokers.redis import RedisBroker
from dramatiq.middleware import TimeLimit, Retries
from core.config import get_settings
_broker_instance = None
def get_broker() -> RedisBroker:
global _broker_instance
if _broker_instance is None:
s = get_settings()
# 初始化 Broker(保持你的配置)
_broker_instance = RedisBroker(
url="redis://:redis123@localhost:9001/0",
middleware=[TimeLimit(), Retries()]
)
# 声明所有队列(确保生产者和消费者都能识别)
for queue in ["scan", "metadata", "persist", "delete", "localize"]:
_broker_instance.declare_queue(queue)
# 关键:全局注册 Broker,让所有 Actor 默认使用这个实例
set_broker(_broker_instance)
return _broker_instance
# 初始化时自动创建并注册 Broker(避免导入时未初始化)
broker = get_broker()
2.修改 consumers.py:移除本地 Broker 实例,直接使用全局 Broker
# consumers.py 顶部修改
from .broker import broker # 直接导入全局 Broker 实例(已注册)
# 提前定义 _now() 函数(避免找不到)
def _now() -> str:
import datetime
return datetime.datetime.now().isoformat()
# Actor 装饰器无需显式指定 broker=broker(全局已注册)
@actor(queue_name="scan") # 移除 broker=broker
def scan_worker(task_id: str, payload: Dict[str, Any]) -> None:
logger.info(f"✅ 消费者接收到扫描任务:task_id={task_id},payload={payload}")
store = get_state_store()
# 修复事件循环问题(和单文件一致)
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# 后续逻辑不变...
loop.run_until_complete(store.update_status(task_id, TaskStatus.RUNNING))
async def _run():
from services.scan.unified_scan_engine import get_unified_scan_engine
eng = await get_unified_scan_engine()
async def _progress(scanned: int, media_found: int):
try:
await store.update_status(task_id, TaskStatus.RUNNING, updated_at=_now())
except Exception as e:
logger.error(f"任务 {task_id} 进度更新失败:{e}", exc_info=True)
storage_id = int(payload["storage_id"])
scan_path = payload["scan_path"]
res = await eng.scan_storage(storage_id=storage_id, scan_path=scan_path, progress_cb=_progress)
logger.info(f"扫描任务 {task_id} 完成:新文件 {len(res.new_file_ids)} 个")
await store.update_status(
task_id, TaskStatus.SUCCESS,
finished_at=_now(),
updated_at=_now(),
result=json.dumps({"new_file_count": len(res.new_file_ids)})
)
loop.run_until_complete(_run())
except Exception as e:
error_msg = str(e)
logger.error(f"❌ 任务 {task_id} 执行失败:{error_msg}", exc_info=True)
loop.run_until_complete(
store.update_status(
task_id, TaskStatus.FAILED,
error_message=error_msg,
finished_at=_now(),
updated_at=_now()
)
)
# 其他 Actor(metadata_worker/persist_worker 等)也做同样修改:
# 1. 移除 broker=broker 参数
# 2. 修复事件循环逻辑
3.修改 producer.py:使用全局 Broker,无需手动绑定 Actor.broker
# producer.py 顶部修改
from .broker import broker # 导入全局 Broker(确保已注册)最早导入
# create_scan_task 中移除手动绑定 Broker 的代码
async def create_scan_task(user_id: str, storage_id: str, scan_path: str, *, priority: TaskPriority = TaskPriority.NORMAL, idempotency_key: Optional[str] = None) -> str:
payload = ScanPayload(user_id=user_id, storage_id=storage_id, scan_path=scan_path).model_dump()
if idempotency_key:
payload["idempotency_key"] = idempotency_key
logger.info(f"创建扫描任务:{user_id} {storage_id} {scan_path}")
# 1. 初始化任务状态(必须,否则消费者更新状态时可能找不到)
task_id = _new_task_id()
store = get_state_store()
rec = TaskRecord(
task_id=task_id,
task_type="scan",
queue="scan",
status=TaskStatus.PENDING,
payload=payload,
max_retries=3,
time_limit_ms=None,
created_at=_now_iso(),
)
await store.create_task(rec)
# 2. 移除手动绑定 Broker(全局已注册,Actor 自动使用)
# broker = get_broker() # 注释掉
# scan_worker.broker = broker # 注释掉
# 3. 直接发送任务(Actor 会使用全局 Broker)
logger.info(f"发送扫描任务 {task_id} 到队列 'scan',payload: {payload}")
scan_worker.send(task_id, payload)
# 4. 幂等键设置(可选)
if idempotency_key:
try:
await store.set_idempotency(idempotency_key, task_id)
except Exception as e:
logger.error(f"设置幂等键失败:{e}")
return task_id
二、第二个潜在问题:模块导入顺序 / 循环导入
多文件拆分时,若 producer.py 和 consumers.py 相互导入(比如 consumers.py 导入 producer.py 的 create_metadata_task),会导致 Actor 未被正确初始化(@actor 装饰器未执行),进而导致任务无法被消费。
修复方案:避免循环导入
- 将「任务创建函数」(如
create_metadata_task)单独拆分到task_factory.py,避免producer.py和consumers.py直接导入。 - 若必须在消费者中创建后续任务,使用「延迟导入」(在函数内部导入):
# consumers.py 的 scan_worker 中,若需要创建 delete 任务:
async def _run():
# ... 扫描逻辑 ...
try:
# 延迟导入,避免循环导入
from .task_factory import create_delete_task
await create_delete_task(...)
except Exception as e:
logger.error(f"创建 delete 任务失败:{e}")
修复方案:确保消费者启动时初始化 Broker
在 consumers.py 顶部添加:
# consumers.py 顶部
from .broker import broker # 导入时自动初始化 Broker 并声明队列
死信队列(Dead Letter)配置
场景 :Redis 配置死信队列
Redis 无需手动创建死信队列,Dramatiq 会自动在 Redis 中创建 dramatiq:dead-letters:{queue_name} 键(Sorted Set 结构,score 为消息时间戳,value 为消息序列化内容)。
配置代码示例:
import dramatiq
from dramatiq.brokers.redis import RedisBroker
# 初始化 Redis Broker(生产环境需配置密码、集群/哨兵)
redis_broker = RedisBroker(
url="redis://:your-redis-password@your-redis-host:6379/0", # 数据库 0 用于业务,避免混用
max_connections=100, # 配置连接池,避免连接泄露
)
# 配置全局重试策略(与 RabbitMQ 一致)
redis_broker.add_middleware(
dramatiq.middleware.Retries(
max_retries=3,
min_backoff=1000,
max_backoff=300000,
max_age=86400000,
)
)
dramatiq.set_broker(redis_broker)
3.3 局部配置:单个 Actor 覆盖全局策略
若部分 Actor 需特殊重试策略(如核心业务 Actor 重试次数更多),可在 Actor 装饰器中单独配置,覆盖全局参数:
@dramatiq.actor(
queue_name="payment-process-queue", # 核心业务队列,单独配置
max_retries=5, # 支付相关消息重试 5 次(多于全局的 3 次)
time_limit=60000, # 超时时间 60 秒(更长)
)
def process_payment(amount: float, user_id: str):
# 支付处理逻辑
pass
4.2 Redis:通过命令行 / 客户端查看
Redis 死信存储在 dramatiq:dead-letters:{queue_name} 键中(Sorted Set),可通过 redis-cli 或可视化工具(如 RedisInsight)查看:
1. 查看死信队列的消息数量
# 查看 order-process-queue 的死信数量
redis-cli -h your-redis-host -a your-redis-password ZCARD dramatiq:dead-letters:order-process-queue
2. 查看所有死信消息(按时间戳排序)
# 查看前 10 条死信消息(WITHSCORES 显示时间戳)
redis-cli -h your-redis-host -a your-redis-password ZRANGE dramatiq:dead-letters:order-process-queue 0 9 WITHSCORES
3. 解析消息内容
Redis 中死信消息是 JSON 序列化 的字符串,需解析后查看:
import json
import redis
redis_client = redis.Redis(
host="your-redis-host",
port=6379,
password="your-redis-password",
db=0,
)
# 获取死信消息(前 1 条)
dead_letters = redis_client.zrange(
"dramatiq:dead-letters:order-process-queue", 0, 0, withscores=True
)
if dead_letters:
msg_json, timestamp = dead_letters[0]
msg = json.loads(msg_json)
print("消息内容:", msg["args"]) # 消息参数(如 (100.0, "user_123"))
print("重试次数:", msg["retries"]) # 已重试次数
print("失败原因:", msg["exception"]) # 异常信息(如 "ConnectionTimeout")
print("时间戳:", timestamp) # 进入死信的时间(毫秒)
处理死信消息
Redis 手动重投:
通过 dramatiq.broker 的 enqueue 方法重新投递:
import dramatiq
from dramatiq.brokers.redis import RedisBroker
redis_broker = RedisBroker(url="redis://:your-password@your-host:6379/0")
dramatiq.set_broker(redis_broker)
# 1. 先删除死信队列中的消息(避免重复处理)
redis_broker.client.zremrangebyrank(
"dramatiq:dead-letters:order-process-queue", 0, 0 # 删除第一条死信
)
# 2. 重新投递消息到业务队列
@dramatiq.actor(queue_name="order-process-queue")
def process_order(order_id: str):
pass
# 重新入队(使用原始参数)
process_order.send("order_456")
自动处理:适合高频、可自动化修复的场景
对于可通过代码自动修复的死信(如重试依赖服务、补全缺失数据),可编写 死信消费者 Actor,监听死信队列并自动处理:
示例:Redis 死信自动处理 Actor
import dramatiq
import time
from dramatiq.brokers.redis import RedisBroker
redis_broker = RedisBroker(url="redis://:your-password@your-host:6379/0")
dramatiq.set_broker(redis_broker)
# 死信消费者 Actor:每 10 秒检查一次死信队列
@dramatiq.actor(queue_name="dead-letter-processor", time_limit=30000)
def process_dead_letters(queue_name: str):
dead_letter_key = f"dramatiq:dead-letters:{queue_name}"
# 1. 获取所有死信消息
dead_letters = redis_broker.client.zrange(dead_letter_key, 0, -1, withscores=True)
if not dead_letters:
return
for msg_json, timestamp in dead_letters:
msg = json.loads(msg_json)
order_id = msg["args"][0] # 假设消息参数是 (order_id,)
try:
# 2. 尝试自动修复(如检查订单状态、重试依赖服务)
if is_temporary_failure(msg["exception"]): # 自定义判断临时故障的函数
# 3. 修复成功,重新投递到业务队列
process_order.send(order_id)
# 4. 从死信队列删除该消息
redis_broker.client.zrem(dead_letter_key, msg_json)
print(f"自动修复并投递消息: {order_id}")
else:
# 5. 永久故障,触发告警(如发送邮件/短信)
send_alert(f"永久死信: {order_id}, 原因: {msg['exception']}")
except Exception as e:
print(f"处理死信失败: {str(e)}")
time.sleep(1) # 避免处理失败时频繁重试
# 业务 Actor(需与死信对应的业务队列一致)
@dramatiq.actor(queue_name="order-process-queue")
def process_order(order_id: str):
pass
# 辅助函数:判断是否为临时故障
def is_temporary_failure(exception_msg: str) -> bool:
temporary_errors = ["ConnectionTimeout", "ServiceUnavailable", "503"]
return any(err in exception_msg for err in temporary_errors)
# 辅助函数:发送告警
def send_alert(msg: str):
# 集成企业微信/钉钉/邮件告警接口
pass
生产环境监控与告警(关键)
死信队列的堆积往往意味着业务故障(如依赖服务宕机、代码 bug),需配置监控告警,避免问题被忽略。
6.1 核心监控指标
| 指标 | 说明 | 告警阈值建议 |
|---|---|---|
| 死信队列消息数 | 死信队列中未处理的消息总量 | 超过 10 条(根据业务调整) |
| 死信增长率 | 单位时间内新增死信的数量 | 5 分钟内增长超过 5 条 |
| 死信存活时间 | 死信消息在队列中的留存时间 | 超过 24 小时(未处理) |
Redis:使用 Redis Exporter + Prometheus
- 部署 Redis Exporter(监控 Redis 指标);
- Prometheus 采集
redis_sorted_set_size指标(过滤key=~"dramatiq:dead-letters:*"); - 后续步骤与 RabbitMQ 一致,通过 Grafana 监控并配置告警。
元数据版本管理
一、先明确 3 个核心概念的定位(结合代码模型)
在现有代码的MediaCore和新增的MediaVersion体系中,剧集版本管理的核心是 “内容主体不变,文件版本分层”:
| 概念 | 类型(MediaCore.kind/MediaVersion.scope) |
作用(代表什么) | 举例 |
|---|---|---|---|
| 季核心 | MediaCore.kind = "season" |
某一季的 “内容主体”(比如《绝命毒师》第 1 季),内容唯一,不随文件版本变 | 核心 ID=101,title=“绝命毒师 第 1 季” |
| 季版本 | MediaVersion.scope = "season_group" |
某一季的 “文件批次版本”(按文件夹划分),同一季核心可对应多个季版本 | 版本 1:文件夹 “绝命毒师 S01 - 蓝光版”;版本 2:文件夹 “绝命毒师 S01-4K 重制版” |
| 单集子版本 | MediaVersion.scope = "episode_child" |
某一集的 “单个文件版本”,每个文件对应一个子版本,且关联到所属的季版本 | 子版本 A:“S01E01 - 蓝光版.mkv”;子版本 B:“S01E01-4K 重制版.mkv” |
| 单集核心 | MediaCore.kind = "episode" |
某一集的 “内容主体”(比如《绝命毒师》S01E01),内容唯一 | 核心 ID=201,title=“绝命毒师 S01E01: Pilot” |
二、你的理解对吗?逐一验证
你提到的 3 个点,本质是 “内容 - 版本 - 文件” 的对应关系,全部正确,但需要补充 “层级关联” 的细节:
1. “episodecore 一个集核心媒体对应多个版本,版本和文件一一对应” → 完全正确
原因:单集核心(如 S01E01 的
MediaCore)代表 “这一集的内容”(剧情、时长、演职人员等固定信息),而 “版本” 是 “内容的不同文件载体”(比如蓝光版文件、4K 版文件)。对应关系:
1个episode core ← 多个子版本(MediaVersion.scope=episode_child) ← 每个子版本对应1个文件(FileAsset)例:
- episode core=201(S01E01 内容)
- 子版本 1:关联文件 “绝命毒师 S01 - 蓝光版 / S01E01.mkv”
- 子版本 2:关联文件 “绝命毒师 S01-4K 重制版 / S01E01.mkv”
2. “季版本是一个季核心对应多个季版本” → 完全正确
原因:季核心(如 “绝命毒师第 1 季” 的
MediaCore)代表 “这一季的内容主体”(10 集剧情框架、整体主题等),而 “季版本” 是 “这一季的不同文件批次”(按文件夹划分,同一批次的文件属于同一版本)。对应关系:
1个season core ← 多个季版本(MediaVersion.scope=season_group)例:
- season core=101(绝命毒师第 1 季内容)
- 季版本 1:文件夹 “绝命毒师 S01 - 蓝光版”(包含 S01E01~E10 的蓝光文件)
- 季版本 2:文件夹 “绝命毒师 S01-4K 重制版”(包含 S01E01~E10 的 4K 文件)
3. “季版本是根据集文件路径中的父文件夹确定,每个集的版本都有对应的季版本” → 正确,需补充 “层级关联逻辑”
- 关键细节:单集子版本通过
parent_version_id关联到季版本,而季版本的 “唯一标识” 就是 “单集文件的父文件夹路径”(代码中MediaVersion.version_path存储该路径)。 - 具体流程(结合文件导入场景):
- 导入文件 “绝命毒师 S01 - 蓝光版 / S01E01.mkv” → 提取父文件夹路径 “绝命毒师 S01 - 蓝光版”;
- 查找 “season core=101” 下是否有
version_path="绝命毒师S01-蓝光版"的季版本 → 没有则创建季版本 A(scope=season_group); - 为该文件创建子版本 B(scope=episode_child),并设置
parent_version_id=季版本A的ID; - 子版本 B 关联到 “episode core=201”(S01E01 内容),同时
FileAsset.version_id=子版本B的ID。
- 最终效果:同一文件夹下的所有单集文件(S01E01~E10),都会关联到同一个季版本(因为父文件夹相同),形成 “季版本→一批子版本(单集文件) ” 的批量管理关系。
三、用一张 “链路图” 总结所有对应关系
以《绝命毒师》第 1 季为例,完整链路如下:
# 内容层(固定不变)
season core=101(绝命毒师第1季内容)
├─ episode core=201(S01E01内容)
├─ episode core=202(S01E02内容)
└─ ...(共10个episode core)
# 版本层(按文件夹划分,关联内容层)
季版本A(scope=season_group,version_path="绝命毒师S01-蓝光版")→ 关联season core=101
├─ 子版本A1(scope=episode_child)→ 关联episode core=201 → 对应文件“蓝光版/S01E01.mkv”
├─ 子版本A2(scope=episode_child)→ 关联episode core=202 → 对应文件“蓝光版/S01E02.mkv”
└─ ...(10个子版本,对应蓝光版10个文件)
季版本B(scope=season_group,version_path="绝命毒师S01-4K重制版")→ 关联season core=101
├─ 子版本B1(scope=episode_child)→ 关联episode core=201 → 对应文件“4K版/S01E01.mkv”
├─ 子版本B2(scope=episode_child)→ 关联episode core=202 → 对应文件“4K版/S01E02.mkv”
└─ ...(10个子版本,对应4K版10个文件)
新增季版本分层管理逻辑
本次更新核心是落地季版本(season_group)与单集子版本(episode_child)的层级关联,通过文件父文件夹路径生成季版本,单集版本通过parent_version_id关联到季版本,实现 “季版本管文件夹、子版本管单文件” 的分层管理。
第一步:更新数据库模型(MediaVersion 新增父版本关联字段)
首先补充MediaVersion的parent_version_id字段,用于关联子版本到父季版本
完整流程示例
假设存在文件:/movies/绝命毒师S01-蓝光版/S01E01.mkv、/movies/绝命毒师S01-蓝光版/S01E02.mkv
- 处理
S01E01.mkv时,提取父文件夹路径/movies/绝命毒师S01-蓝光版,创建季版本(season_group),ID 为 1001。 - 创建单集版本(
episode_child),ID 为 2001,parent_version_id=1001,关联到该文件。 - 处理
S01E02.mkv时,提取相同父文件夹路径,复用季版本 1001。 - 创建单集版本 2002,
parent_version_id=1001,关联到该文件。 - 此时,季版本 1001 管理着 2001、2002 两个单集版本,实现批量管理。