扫描服务和存储服务优化
架构优化-异步,单例
1. 架构全景:分层与职责
在你的架构中,核心在于**“控制流”与“数据流”的分离**。
- 业务逻辑层 (Dramatiq Worker):负责“想”。它决定扫描哪个目录、如何对比数据库、何时更新记录。它是一个 无状态 的消费者。
- 资源中台层 (StorageService):负责“做”。它持有 有状态 的连接(TCP 句柄、Session)。应该将
StorageService改造为一个严格单例,并引入异步锁 - 持久化层 (PostgreSQL/SqlModel):负责“记”。它是最终的一致性来源。
2. 异步处理的底层逻辑
在 Python 的 asyncio 环境下,异步的核心不是“并行”(同时做多件事),而是**“非阻塞等待”**。
- 网络 IO 密集型:当你调用 WebDAV 的
list_dir时,CPU 是闲置的。异步让出 CPU 去处理另一个租户的扫描任务,而不需要像传统多线程那样为每个任务分配 8MB 的栈内存。 - Dramatiq 的角色:Dramatiq 本身支持多进程。在每个进程内部,你可以运行异步代码(通过
asyncio.run或集成异步 worker)。这形成了 “多进程(水平扩展)+ 协程(垂直并发)” 的强大组合。
3. 处理异步与并发冲突(核心痛点)
在多租户、分布式环境下,冲突主要发生在三个地方:
A. 连接创建冲突(缓存击穿)
场景:Dramatiq 收到 10 个扫描任务,全是租户 A 的。这 10 个协程几乎同时到达 StorageService.get_client(storage_id)。
解决方案:按 Key 分片的异步锁。
我们不能给整个 get_client 加锁(那会让所有租户排队),而是给特定的 storage_id 加锁。
# StorageService 内部逻辑
async def get_client(self, storage_id: int):
if storage_id not in self._locks:
self._locks[storage_id] = asyncio.Lock()
async with self._connection_locks[storage_id]:
if storage_id in self._clients:
return self._clients[storage_id]
# 执行昂贵的握手操作...
B. 数据库写冲突(乐观锁/事务)
场景:同一个文件在极短时间内被两个任务扫描(比如用户手动触发扫描的同时,定时任务也开始了)。
解决方案:
- 数据库唯一约束:在
full_path + storage_id + user_id上建立复合唯一索引。这是最后的防线。 - Upsert 机制:利用 PostgreSQL 的
ON CONFLICT DO UPDATE。这把“检查是否存在 -> 更新/插入”合并为一条原子操作,消除了应用层的竞态条件。
C. 任务排重(分布式锁)
场景:两个 Dramatiq Worker 都在扫同一个存储。
解决方案:分布式锁 (Redis)。
在任务进入 process_batch 前,检查 Redis 中是否存在 scanning:storage_1 的 Key。如果存在,跳过或延迟重试。
4. 架构优化建议:存储中台的“健康状态”
既然 StorageService 是中台,它必须能自我感知连接质量:
- 心跳检测:在单例的
_clients缓存中,不能无限期存储 Client。建议给 Client 增加last_used时间戳,或者在get_client时执行轻量级的check_connection。 - 断线自动清理:如果 SMB 连接因为网络波动断开,
StorageService必须捕获异常并调用release_client(storage_id),确保下次请求能重新建立连接,而不是返回一个死掉的句柄。 - 租户限流:在中台层可以方便地实现租户限流。例如:限制 WebDAV 类型的存储全局并发请求数不得超过 50,防止因触发云盘反爬虫而导致所有租户不可用。
5. 总结
| 问题 | 解决方案 | 作用位置 |
|---|---|---|
| 重复建立连接 | asyncio.Lock (按 ID 分片) |
StorageService |
| 数据入库冲突 | ON CONFLICT (UPSERT) |
Repository |
| 重复执行扫描 | Redis 分布式锁 / Dramatiq 任务排重 | Task Entry |
| 连接资源泄露 | 单例模式 + asyncio.create_task 异步释放 |
全局 |
抽象基类(StorageClient)明确了行为,工厂模式(StorageClientFactory)解决了实例化问题。
为了将这个 StorageService 进化为真正的“中台级”服务,我们需要解决以下四个核心问题:并发安全、资源泄漏、数据库性能、以及对“扫描任务”的深度支持。
1. 核心改进:StorageService 的架构升级
你应该将 StorageService 改造为一个严格单例,并引入异步锁。
import asyncio
import threading
from typing import Dict, Any, Optional
from sqlmodel import select
from core.db import AsyncSessionLocal # 建议切换到异步Session
class StorageService:
_instance = None
_lock = threading.Lock()
def __new__(cls):
if not cls._instance:
with cls._lock:
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
# 确保只初始化一次
if not hasattr(self, "_initialized"):
self._clients: Dict[str, StorageClient] = {}
self._connection_locks: Dict[str, asyncio.Lock] = {}
self._initialized = True
async def get_client_by_id(self, storage_id: int) -> StorageClient:
"""
供扫描服务使用的核心接口:通过 ID 获取已连接的客户端
"""
# 使用异步 Session,避免阻塞事件循环
async with AsyncSessionLocal() as session:
stmt = select(StorageConfig).where(StorageConfig.id == storage_id)
result = await session.execute(stmt)
storage_config = result.scalar_one_or_none()
if not storage_config:
raise ValueError(f"存储配置 ID={storage_id} 不存在")
# 构建缓存键(直接用 ID 更稳定)
client_key = str(storage_id)
# 1. 检查缓存
if client_key in self._clients:
return self._clients[client_key]
# 2. 异步加锁,防止并发创建同一个连接
if client_key not in self._connection_locks:
self._connection_locks[client_key] = asyncio.Lock()
async with self._connection_locks[client_key]:
# 双重检查
if client_key in self._clients:
return self._clients[client_key]
# 3. 创建并连接
detailed_config = self._get_detailed_config(session, storage_config)
client = StorageClientFactory.create(
storage_config.storage_type,
storage_config.name,
detailed_config
)
await client.connect()
self._clients[client_key] = client
return client
2. 存储客户端(StorageClient)的优化
为了配合 UnifiedScanEngine 的并发扫描,基类和具体实现需要增加两个核心能力:
A. 引入并发控制(Semaphore)
在基类中内置信号量,防止扫描器对同一个 SMB/WebDAV 服务器发起成百上千的并发请求。
class StorageClient(ABC):
def __init__(self, name: str, config: Dict[str, Any]):
self.name = name
self.config = config
self._connected = False
# 限制针对该存储实例的并发请求数(默认3-5比较安全)
self.semaphore = asyncio.Semaphore(config.get("max_concurrency", 5))
async def list_dir_safe(self, path: str, depth: int = 1) -> List[StorageEntry]:
"""带并发保护的目录列表"""
async with self.semaphore:
return await self.list_dir(path, depth)
B. 统一路径处理
不同后端(Windows SMB 的 \ 与 WebDAV 的 /)路径分隔符不同。建议在基类中增加一个 normalize_path 方法,确保扫描引擎处理的路径永远是统一格式。
3. 针对扫描任务的架构设计(中台调用流)
当你使用 Dramatiq 处理扫描任务时,流程如下:
- Worker 启动:任务进入进程,获取全局单例
storage_service。 - 获取客户端:调用
await storage_service.get_client_by_id(storage_id)。 - 并发扫描:
- 扫描引擎不使用递归。
- 使用
asyncio.Queue存储发现的子目录。 - 开启 3-5 个
worker协程,每个协程从队列取路径,并调用client.list_dir_safe()。
- 资源回收:单例会一直持有连接。只有当用户在 Web 界面删除或禁用了该存储配置时,才通过
storage_service.remove_client(id)显式关闭连接。
4. 关键优化点总结
| 优化项 | 解决的问题 | 实现方案 |
|---|---|---|
异步锁 (asyncio.Lock) |
解决多个扫描任务同时启动时,重复建立同一个连接的问题。 | 在 get_client 中按 ID 加锁。 |
信号量 (Semaphore) |
防止并发扫描过快导致被网盘/NAS 封锁。 | 限制单个 Client 的并发请求数。 |
| 异步数据库查询 | 解决在异步环境下调用同步 get_session 导致的卡顿。 |
切换到 sqlalchemy.ext.asyncio。 |
| 解耦 Session 管理 | 解决 StorageService 过度依赖数据库会话的问题。 |
仅在初始化 Client 时查询 DB,之后全内存操作。 |
扫描流程优化:
递归扫描改为基于队列的并发扫描
目前代码是深度优先递归,效率瓶颈在于由于 await 递归调用,必须等待一个文件夹扫描完才能开始下一个。改为广度优先 + 多协程并行后,可以显著提升 IO 密集型任务(如云存储或远程磁盘扫描)的性能。
async for batch in self._scan_in_batches(storage_client, scan_path, recursive, max_depth, batch_size):
batch_results = await self.processor.process_batch(batch, {
"storage_id": storage_id,
"user_id": user_id
})
async def _scan_in_batches(self, client: StorageClient, path: str, recursive: bool, max_depth: int, size: int) -> AsyncGenerator[List[StorageEntry], None]:
"""分批产生文件列表"""
current_batch: List[StorageEntry] = []
async for entry in self._recursive_walk(client, path, recursive, max_depth, 0):
current_batch.append(entry)
if len(current_batch) >= size:
yield current_batch
current_batch = []
if current_batch:
yield current_batch
async def _recursive_walk(self, client: StorageClient, path: str, recursive: bool, max_depth: int, depth: int) -> AsyncGenerator[StorageEntry, None]:
"""递归遍历存储"""
if depth >= max_depth:
return
try:
entries: List[StorageEntry] = await client.list_dir(path)
for entry in entries:
if not entry.is_dir:
yield entry
elif recursive:
async for sub in self._recursive_walk(client, entry.path, recursive, max_depth, depth + 1):
yield sub
except Exception as e:
logger.error(f"List dir failed {path}: {e}")
改进后代码:
# --- 核心改造:引入并发队列 ---
dir_queue = asyncio.Queue() # 待扫描目录队列
file_queue = asyncio.Queue(maxsize=batch_size * 2) # 扫描出的文件缓存,限长防止内存溢出
# 记录正在运行的任务,用于优雅关闭
await dir_queue.put((scan_path, 0)) # (路径, 当前深度)
# 1. 启动【扫描小分队】
scan_workers = [
asyncio.create_task(self._worker_scanner(storage_client, dir_queue, file_queue, kwargs))
for _ in range(self.max_scan_workers)
]
# 2. 启动【入库处理器】
processor_worker = asyncio.create_task(
self._worker_processor(file_queue, result, seen_paths, media_paths, storage_id, user_id, batch_size, kwargs.get('progress_cb'))
)
# 3. 等待所有目录扫描完成
await dir_queue.join()
# 4. 清理:停止扫描协程
for w in scan_workers:
w.cancel()
# 5. 告知处理器没有更多文件了
await file_queue.put(None)
await processor_worker
async def _worker_scanner(self, client: StorageClient, in_q: asyncio.Queue, out_q: asyncio.Queue, config: dict):
"""扫描协程:不停从队列拿目录,list_dir,结果塞入文件队列"""
recursive = config.get('recursive', True)
max_depth = config.get('max_depth', 10)
while True:
try:
path, depth = await in_q.get()
if depth >= max_depth:
in_q.task_done()
continue
entries = await client.list_dir(path)
for entry in entries:
if entry.is_dir:
if recursive:
await in_q.put((entry.path, depth + 1))
else:
# 发现文件,塞给处理器
await out_q.put(entry)
in_q.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Worker scan error on {path}: {e}")
in_q.task_done()
async def _worker_processor(self, in_q: asyncio.Queue, result: ScanResult, seen_paths: Set[str], media_paths: Set[str], storage_id, user_id, batch_size, progress_cb):
"""处理协程:攒批,调用 process_batch"""
batch = []
async def flush_batch():
nonlocal batch
if not batch: return
# 这里的业务逻辑和原代码一致
batch_results = await self.processor.process_batch(batch, {"storage_id": storage_id, "user_id": user_id})
# 统计汇总 (此处由于只有一个 processor worker,不需要加锁)
result.total_files += len(batch)
for entry in batch:
seen_paths.add(entry.path)
if not entry.is_dir and self.processor._is_media_file(entry.path):
media_paths.add(entry.path)
for r in batch_results:
status = r.get("status")
if status == "new":
result.new_files += 1
if "file_id" in r: result.new_file_ids.append(r["file_id"])
elif status == "updated":
result.updated_files += 1
elif status == "error":
result.errors += 1
result.error_details.append(r)
result.media_files = len(media_paths)
if progress_cb:
res = progress_cb(result.total_files, result.media_files)
if asyncio.iscoroutine(res): await res
batch = []
while True:
entry = await in_q.get()
if entry is None: # 收到结束信号
await flush_batch()
in_q.task_done()
break
batch.append(entry)
if len(batch) >= batch_size:
await flush_batch()
in_q.task_done()
改进点解析
- 生产者-消费者模型 (
asyncio.Queue):queue: 存储待扫描的目录。file_batch_queue: 存储扫描出来的文件实体。
- 并发扫描 (
max_workers):- 启动了多个
_scanner_worker。当一个 worker 在等待list_dir的网络响应时,其他 worker 可以继续处理队列中的其他路径。这对于延迟较高的存储(如 S3, WebDAV, 百度网盘)提升极大。
- 启动了多个
- 职责分离:
- Scanner: 只负责 IO 读目录。
- Processor: 负责攒批、逻辑计算、写数据库。
- 避免死锁:
- 通过
queue.join()确保所有目录扫描完毕。 - 通过发送
None信号通知Processor扫描已结束,可以处理最后一批剩余数据并退出。
- 通过
注意事项
- 频率限制 (Rate Limiting): 如果扫描的是第三方网盘 API,
max_workers设置过大会导致429 Too Many Requests。建议设置为 5-10 之间。 - 内存控制: 如果文件量极大(百万级),
seen_paths集合会占用较多内存。如果内存敏感,建议将seen_paths替换为 Bloom Filter 或临时数据库表。 - 深度控制: 为了支持
max_depth,建议将入队的数据结构改为元组await queue.put((entry.path, current_depth + 1))。