扫描服务和存储服务优化

架构优化-异步,单例


1. 架构全景:分层与职责

在你的架构中,核心在于**“控制流”与“数据流”的分离**。

  • 业务逻辑层 (Dramatiq Worker):负责“想”。它决定扫描哪个目录、如何对比数据库、何时更新记录。它是一个 无状态 的消费者。
  • 资源中台层 (StorageService):负责“做”。它持有 有状态 的连接(TCP 句柄、Session)。应该将 StorageService 改造为一个严格单例,并引入异步锁
  • 持久化层 (PostgreSQL/SqlModel):负责“记”。它是最终的一致性来源。

2. 异步处理的底层逻辑

在 Python 的 asyncio 环境下,异步的核心不是“并行”(同时做多件事),而是**“非阻塞等待”**。

  • 网络 IO 密集型:当你调用 WebDAV 的 list_dir 时,CPU 是闲置的。异步让出 CPU 去处理另一个租户的扫描任务,而不需要像传统多线程那样为每个任务分配 8MB 的栈内存。
  • Dramatiq 的角色:Dramatiq 本身支持多进程。在每个进程内部,你可以运行异步代码(通过 asyncio.run 或集成异步 worker)。这形成了 “多进程(水平扩展)+ 协程(垂直并发)” 的强大组合。

3. 处理异步与并发冲突(核心痛点)

在多租户、分布式环境下,冲突主要发生在三个地方:

A. 连接创建冲突(缓存击穿)

场景:Dramatiq 收到 10 个扫描任务,全是租户 A 的。这 10 个协程几乎同时到达 StorageService.get_client(storage_id)。

解决方案:按 Key 分片的异步锁。

我们不能给整个 get_client 加锁(那会让所有租户排队),而是给特定的 storage_id 加锁。

# StorageService 内部逻辑
async def get_client(self, storage_id: int):
    if storage_id not in self._locks:
        self._locks[storage_id] = asyncio.Lock()
        
    async with self._connection_locks[storage_id]:
        if storage_id in self._clients:
            return self._clients[storage_id]
        # 执行昂贵的握手操作...

B. 数据库写冲突(乐观锁/事务)

场景:同一个文件在极短时间内被两个任务扫描(比如用户手动触发扫描的同时,定时任务也开始了)。

解决方案:

  1. 数据库唯一约束:在 full_path + storage_id + user_id 上建立复合唯一索引。这是最后的防线。
  2. Upsert 机制:利用 PostgreSQL 的 ON CONFLICT DO UPDATE。这把“检查是否存在 -> 更新/插入”合并为一条原子操作,消除了应用层的竞态条件。

C. 任务排重(分布式锁)

场景:两个 Dramatiq Worker 都在扫同一个存储。

解决方案:分布式锁 (Redis)。

在任务进入 process_batch 前,检查 Redis 中是否存在 scanning:storage_1 的 Key。如果存在,跳过或延迟重试。


4. 架构优化建议:存储中台的“健康状态”

既然 StorageService 是中台,它必须能自我感知连接质量:

  1. 心跳检测:在单例的 _clients 缓存中,不能无限期存储 Client。建议给 Client 增加 last_used 时间戳,或者在 get_client 时执行轻量级的 check_connection
  2. 断线自动清理:如果 SMB 连接因为网络波动断开,StorageService 必须捕获异常并调用 release_client(storage_id),确保下次请求能重新建立连接,而不是返回一个死掉的句柄。
  3. 租户限流:在中台层可以方便地实现租户限流。例如:限制 WebDAV 类型的存储全局并发请求数不得超过 50,防止因触发云盘反爬虫而导致所有租户不可用。

5. 总结

问题 解决方案 作用位置
重复建立连接 asyncio.Lock (按 ID 分片) StorageService
数据入库冲突 ON CONFLICT (UPSERT) Repository
重复执行扫描 Redis 分布式锁 / Dramatiq 任务排重 Task Entry
连接资源泄露 单例模式 + asyncio.create_task 异步释放 全局

抽象基类(StorageClient)明确了行为,工厂模式(StorageClientFactory)解决了实例化问题。

为了将这个 StorageService 进化为真正的“中台级”服务,我们需要解决以下四个核心问题:并发安全、资源泄漏、数据库性能、以及对“扫描任务”的深度支持

1. 核心改进:StorageService 的架构升级

你应该将 StorageService 改造为一个严格单例,并引入异步锁

import asyncio
import threading
from typing import Dict, Any, Optional
from sqlmodel import select
from core.db import AsyncSessionLocal  # 建议切换到异步Session

class StorageService:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        if not cls._instance:
            with cls._lock:
                if not cls._instance:
                    cls._instance = super().__new__(cls)
        return cls._instance

    def __init__(self):
        # 确保只初始化一次
        if not hasattr(self, "_initialized"):
            self._clients: Dict[str, StorageClient] = {}
            self._connection_locks: Dict[str, asyncio.Lock] = {}
            self._initialized = True

    async def get_client_by_id(self, storage_id: int) -> StorageClient:
        """
        供扫描服务使用的核心接口:通过 ID 获取已连接的客户端
        """
        # 使用异步 Session,避免阻塞事件循环
        async with AsyncSessionLocal() as session:
            stmt = select(StorageConfig).where(StorageConfig.id == storage_id)
            result = await session.execute(stmt)
            storage_config = result.scalar_one_or_none()
            
            if not storage_config:
                raise ValueError(f"存储配置 ID={storage_id} 不存在")

            # 构建缓存键(直接用 ID 更稳定)
            client_key = str(storage_id)

            # 1. 检查缓存
            if client_key in self._clients:
                return self._clients[client_key]

            # 2. 异步加锁,防止并发创建同一个连接
            if client_key not in self._connection_locks:
                self._connection_locks[client_key] = asyncio.Lock()

            async with self._connection_locks[client_key]:
                # 双重检查
                if client_key in self._clients:
                    return self._clients[client_key]

                # 3. 创建并连接
                detailed_config = self._get_detailed_config(session, storage_config)
                client = StorageClientFactory.create(
                    storage_config.storage_type, 
                    storage_config.name, 
                    detailed_config
                )
                await client.connect()
                self._clients[client_key] = client
                return client

2. 存储客户端(StorageClient)的优化

为了配合 UnifiedScanEngine 的并发扫描,基类和具体实现需要增加两个核心能力:

A. 引入并发控制(Semaphore)

在基类中内置信号量,防止扫描器对同一个 SMB/WebDAV 服务器发起成百上千的并发请求。

class StorageClient(ABC):
    def __init__(self, name: str, config: Dict[str, Any]):
        self.name = name
        self.config = config
        self._connected = False
        # 限制针对该存储实例的并发请求数(默认3-5比较安全)
        self.semaphore = asyncio.Semaphore(config.get("max_concurrency", 5))

    async def list_dir_safe(self, path: str, depth: int = 1) -> List[StorageEntry]:
        """带并发保护的目录列表"""
        async with self.semaphore:
            return await self.list_dir(path, depth)

B. 统一路径处理

不同后端(Windows SMB 的 \ 与 WebDAV 的 /)路径分隔符不同。建议在基类中增加一个 normalize_path 方法,确保扫描引擎处理的路径永远是统一格式。


3. 针对扫描任务的架构设计(中台调用流)

当你使用 Dramatiq 处理扫描任务时,流程如下:

  1. Worker 启动:任务进入进程,获取全局单例 storage_service
  2. 获取客户端:调用 await storage_service.get_client_by_id(storage_id)
  3. 并发扫描
    • 扫描引擎不使用递归。
    • 使用 asyncio.Queue 存储发现的子目录。
    • 开启 3-5 个 worker 协程,每个协程从队列取路径,并调用 client.list_dir_safe()
  4. 资源回收:单例会一直持有连接。只有当用户在 Web 界面删除禁用了该存储配置时,才通过 storage_service.remove_client(id) 显式关闭连接。

4. 关键优化点总结

优化项 解决的问题 实现方案
异步锁 (asyncio.Lock) 解决多个扫描任务同时启动时,重复建立同一个连接的问题。 get_client 中按 ID 加锁。
信号量 (Semaphore) 防止并发扫描过快导致被网盘/NAS 封锁。 限制单个 Client 的并发请求数。
异步数据库查询 解决在异步环境下调用同步 get_session 导致的卡顿。 切换到 sqlalchemy.ext.asyncio
解耦 Session 管理 解决 StorageService 过度依赖数据库会话的问题。 仅在初始化 Client 时查询 DB,之后全内存操作。

扫描流程优化:

递归扫描改为基于队列的并发扫描

目前代码是深度优先递归,效率瓶颈在于由于 await 递归调用,必须等待一个文件夹扫描完才能开始下一个。改为广度优先 + 多协程并行后,可以显著提升 IO 密集型任务(如云存储或远程磁盘扫描)的性能。

async for batch in self._scan_in_batches(storage_client, scan_path, recursive, max_depth, batch_size):
                
     batch_results = await self.processor.process_batch(batch, {
         "storage_id": storage_id,
         "user_id": user_id
      })
    
    async def _scan_in_batches(self, client: StorageClient, path: str, recursive: bool, max_depth: int, size: int) -> AsyncGenerator[List[StorageEntry], None]:
        """分批产生文件列表"""
        current_batch: List[StorageEntry] = []
        async for entry in self._recursive_walk(client, path, recursive, max_depth, 0):
            current_batch.append(entry)
            if len(current_batch) >= size:
                yield current_batch
                current_batch = []
        if current_batch:
            yield current_batch

    async def _recursive_walk(self, client: StorageClient, path: str, recursive: bool, max_depth: int, depth: int) -> AsyncGenerator[StorageEntry, None]:
        """递归遍历存储"""
        if depth >= max_depth:
            return

        try:
            entries: List[StorageEntry] = await client.list_dir(path)
            for entry in entries:
                if not entry.is_dir:
                    yield entry
                elif recursive:
                    async for sub in self._recursive_walk(client, entry.path, recursive, max_depth, depth + 1):
                        yield sub
        except Exception as e:
            logger.error(f"List dir failed {path}: {e}")

改进后代码:

# --- 核心改造:引入并发队列 ---
        dir_queue = asyncio.Queue()  # 待扫描目录队列
        file_queue = asyncio.Queue(maxsize=batch_size * 2) # 扫描出的文件缓存,限长防止内存溢出
        
        # 记录正在运行的任务,用于优雅关闭
        await dir_queue.put((scan_path, 0)) # (路径, 当前深度)

        # 1. 启动【扫描小分队】
        scan_workers = [
            asyncio.create_task(self._worker_scanner(storage_client, dir_queue, file_queue, kwargs))
            for _ in range(self.max_scan_workers)
        ]

        # 2. 启动【入库处理器】
        processor_worker = asyncio.create_task(
            self._worker_processor(file_queue, result, seen_paths, media_paths, storage_id, user_id, batch_size, kwargs.get('progress_cb'))
        )

        # 3. 等待所有目录扫描完成
        await dir_queue.join()
        
        # 4. 清理:停止扫描协程
        for w in scan_workers:
            w.cancel()
        
        # 5. 告知处理器没有更多文件了
        await file_queue.put(None)
        await processor_worker
        
	async def _worker_scanner(self, client: StorageClient, in_q: asyncio.Queue, out_q: asyncio.Queue, config: dict):
        """扫描协程:不停从队列拿目录,list_dir,结果塞入文件队列"""
        recursive = config.get('recursive', True)
        max_depth = config.get('max_depth', 10)

        while True:
            try:
                path, depth = await in_q.get()
                if depth >= max_depth:
                    in_q.task_done()
                    continue

                entries = await client.list_dir(path)
                for entry in entries:
                    if entry.is_dir:
                        if recursive:
                            await in_q.put((entry.path, depth + 1))
                    else:
                        # 发现文件,塞给处理器
                        await out_q.put(entry)
                
                in_q.task_done()
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"Worker scan error on {path}: {e}")
                in_q.task_done()

    async def _worker_processor(self, in_q: asyncio.Queue, result: ScanResult, seen_paths: Set[str], media_paths: Set[str], storage_id, user_id, batch_size, progress_cb):
        """处理协程:攒批,调用 process_batch"""
        batch = []
        
        async def flush_batch():
            nonlocal batch
            if not batch: return
            
            # 这里的业务逻辑和原代码一致
            batch_results = await self.processor.process_batch(batch, {"storage_id": storage_id, "user_id": user_id})
            
            # 统计汇总 (此处由于只有一个 processor worker,不需要加锁)
            result.total_files += len(batch)
            for entry in batch:
                seen_paths.add(entry.path)
                if not entry.is_dir and self.processor._is_media_file(entry.path):
                    media_paths.add(entry.path)
            
            for r in batch_results:
                status = r.get("status")
                if status == "new":
                    result.new_files += 1
                    if "file_id" in r: result.new_file_ids.append(r["file_id"])
                elif status == "updated":
                    result.updated_files += 1
                elif status == "error":
                    result.errors += 1
                    result.error_details.append(r)
            
            result.media_files = len(media_paths)
            if progress_cb:
                res = progress_cb(result.total_files, result.media_files)
                if asyncio.iscoroutine(res): await res
            batch = []

        while True:
            entry = await in_q.get()
            if entry is None: # 收到结束信号
                await flush_batch()
                in_q.task_done()
                break
            
            batch.append(entry)
            if len(batch) >= batch_size:
                await flush_batch()
            
            in_q.task_done()

改进点解析

  1. 生产者-消费者模型 (asyncio.Queue):
    • queue: 存储待扫描的目录
    • file_batch_queue: 存储扫描出来的文件实体
  2. 并发扫描 (max_workers):
    • 启动了多个 _scanner_worker。当一个 worker 在等待 list_dir 的网络响应时,其他 worker 可以继续处理队列中的其他路径。这对于延迟较高的存储(如 S3, WebDAV, 百度网盘)提升极大。
  3. 职责分离:
    • Scanner: 只负责 IO 读目录。
    • Processor: 负责攒批、逻辑计算、写数据库。
  4. 避免死锁:
    • 通过 queue.join() 确保所有目录扫描完毕。
    • 通过发送 None 信号通知 Processor 扫描已结束,可以处理最后一批剩余数据并退出。

注意事项

  • 频率限制 (Rate Limiting): 如果扫描的是第三方网盘 API,max_workers 设置过大会导致 429 Too Many Requests。建议设置为 5-10 之间。
  • 内存控制: 如果文件量极大(百万级),seen_paths 集合会占用较多内存。如果内存敏感,建议将 seen_paths 替换为 Bloom Filter 或临时数据库表。
  • 深度控制: 为了支持 max_depth,建议将入队的数据结构改为元组 await queue.put((entry.path, current_depth + 1))

文章作者: Mealsee
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Mealsee !
 上一篇
2025-12-22 Mealsee
下一篇 
2025-12-17 Mealsee
  目录