一、系统概述
本系统是一个面向内容检测场景的分布式异步任务调度与资源管理平台,核心提供三方面能力:
- 任务调度:基于设备维度的逻辑队列 + 老化优先级调度,实现多租户公平调度与饥饿感知
- 资源调度:Worker 节点资源注册与心跳,CPU/GPU/内存感知的任务分发
- 稳定性保障:过载保护、背压机制、指数退避、死信队列、任务 TTL 过期等多层可靠性策略
系统基于 Flask + Celery + Redis + PostgreSQL 技术栈构建,采用三层架构(Web 路由层 → 业务服务层 → 异步任务层),所有异步任务通过 Celery 分发至专用队列,由 Worker 集群消费。
二、整体架构
┌─────────────────────────────────────────────────────────┐
│ Web 路由层 (Flask) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 检测结果上报 │ │ 图片处理任务 │ │ 文本处理任务 │ │
│ │ /api/decter │ │ /api/ocr/* │ │ /api/ocr/* │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
└─────────┼──────────────────┼──────────────────┼──────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────┐
│ 服务层 (Services) │
│ ┌────────────────┐ ┌──────────────────────────────┐ │
│ │ 检测结果处理服务 │ │ 任务处理服务 │ │
│ │ (同步处理) │ │ (异步入队 + 过载检查) │ │
│ └────────────────┘ └──────────┬───────────────────┘ │
│ │ │
│ ┌───────────────────────────────▼────────────────────┐ │
│ │ MAC 队列调度服务 │ │
│ │ 按设备维度逻辑队列 · 老化优先级排序 · 分布式互斥锁 │ │
│ │ 指数退避 · 死信 · 饥饿检测 · 动态批大小 │ │
│ └───────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 异步任务层 (Celery + Redis) │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 调度队列 │ │ 图片处理队列 │ │ 文本处理队列 │ │
│ │ dispatch │ │ ocr │ │ text │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Worker 集群 │ │
│ │ GPU Worker (消费所有队列) · CPU Worker (消费部分) │ │
│ │ Worker 注册/心跳 · 过载自检 · 自适应 CPU/GPU │ │
│ └──────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 数据持久层 (PostgreSQL) │
│ 任务记录 · 处理结果 · 统计报表 │
└─────────────────────────────────────────────────────────┘三、任务调度系统
3.1 核心调度模型
系统采用两级调度模型:
第一级:全局调度器 (Celery Beat + dispatch Task)
→ 从 Sorted Set 中选取最优 MAC 设备
→ 加分布式互斥锁防止多 Worker 冲突
第二级:设备级调度器 (process_mac_sensitive_tasks Task)
→ 从该设备的 Redis List 中批量拉取任务
→ Worker 过载自检(背压)
→ 提交具体子任务到专用队列
→ 处理完毕后释放锁,触发下一轮全局调度3.2 按设备维度的逻辑队列
每个设备拥有独立的 Redis List 作为任务队列:
- 队列键格式:
sensitive:mac:queue:{normalized_mac} - 元数据键格式:
sensitive:mac:meta:{normalized_mac}(存储积压数、最早任务时间、退避状态) - 入队操作:RPUSH + 原子递增全局计数器 + 更新调度分数
- 出队操作:RPOP + 原子递减全局计数器 + 惰性过期检查
每设备独立队列的设计带来以下优势:
| 优势 | 说明 |
|---|---|
| 故障隔离 | 单设备大量任务不影响其他设备的调度 |
| 公平调度 | 每个设备独立计算调度优先级,防止少数设备占满所有 Worker |
| 退避隔离 | 设备级指数退避,失败设备自动降权不影响正常设备 |
3.3 老化优先级调度
使用 Redis Sorted Set(sensitive:mac:schedule)维护所有活跃设备的调度优先级:
调度分数 = min(等待秒数, CELERY_MAC_AGING_MAX_WAIT_SECONDS) × CELERY_MAC_AGING_WEIGHT关键特性:
- 等待越久优先级越高:等待时间长的设备获得更高分数,ZREVRANGE 获取最高分设备
- 等待时间上限:
CELERY_MAC_AGING_MAX_WAIT_SECONDS(默认 600s)防止极端情况 - 元数据缓存:最早任务的
created_at缓存在 meta hash 中,避免反复读取队列头部 - 惰性更新:每次入队/出队/心跳时触发
refresh_mac_schedule_score()更新分数
3.4 调度流程详解
1. Beat 触发调度(周期 15s)
│
▼
2. dispatch_next_mac_sensitive_tasks
│ ├─ 恢复退避到期的 MAC
│ └─ claim_next_schedulable_mac()
│ ├─ 从 Sorted Set 中 ZREVRANGE 遍历候选 MAC
│ ├─ 跳过退避中的 MAC(check unschedulable_until)
│ └─ try_acquire_mac_dispatch() — SET NX + TTL 加锁
│
▼
3. process_mac_sensitive_tasks(mac)
│ ├─ 计算动态批大小(get_dynamic_batch_size)
│ │ ├─ 有饥饿设备 → 最小批大小(默认 1)
│ │ └─ 无饥饿设备 → 正常批大小(默认 10)
│ │
│ ├─ 循环处理批内任务
│ │ ├─ Worker 过载自检(背压)
│ │ ├─ 刷新 MAC 运行锁 TTL
│ │ ├─ pop_next_mac_task() — 惰性过期检查
│ │ ├─ 无可用 GPU Worker → 执行 defer/死信策略
│ │ └─ 提交子任务到专用队列
│ │
│ └─ finally:
│ ├─ 释放 MAC 运行锁
│ ├─ 更新调度分数 / 移除空队列
│ └─ 续投全局调度器(有饥饿时传 exclude_mac)
│
▼
4. execute_ocr_sensitive_job / execute_text_sensitive_job
│ 子任务在专用队列上由 Worker 消费
│ ├─ 过载自检(retry 背压)
│ ├─ 执行具体处理逻辑
│ └─ 结果持久化到 PostgreSQL3.5 饥饿检测与动态批大小
饥饿检测机制确保没有设备被长期忽略:
- 判定条件:下一个待调度设备的等待时间 ≥
CELERY_MAC_STARVATION_HINT_SECONDS(默认 30s) - 响应动作:当前设备的批大小从默认值(10)缩减为最小值(1),让出调度机会
- 续投机制:处理完当前设备后,若有饥饿设备存在,全局调度器以
exclude_mac参数续投,跳过刚处理完的设备
def get_dynamic_batch_size(current_mac):
if has_starved_mac(exclude_mac=current_mac):
return max(1, CELERY_MAC_SCHEDULER_MIN_BATCH_SIZE) # 默认 1
return max(1, CELERY_MAC_SCHEDULER_BATCH_SIZE) # 默认 103.6 分布式互斥锁
同一设备可能被多个 Worker 同时调度,需互斥保证:
- 加锁:
SET sensitive:mac:active:{mac} {timestamp} NX EX {TTL} - 续约:处理期间定期
EXPIRE刷新 TTL - 解锁:处理完成后
DEL删除 - 锁超时:默认 900s(
CELERY_MAC_ACTIVE_TTL),防止 Worker 崩溃导致死锁
四、资源调度系统
4.1 Worker 资源注册
每个 Worker 启动时通过 @signals.worker_ready 钩子自动注册资源信息到 Redis:
HSET sensitive:worker:{hostname}:{pid}
hostname "worker-01"
pid "12345"
gpu_available "1" # PaddlePaddle CUDA 检测
gpu_util "23.5" # GPUtil
gpu_memory_util "45.2" # GPUtil
cpu_count "16"
cpu_percent "35.0" # psutil
memory_total "33559728128"
memory_available "12559728128"
memory_percent "62.5" # psutil
active_tasks "0"
last_heartbeat "2026-06-18T10:00:00+00:00"
EXPIRE sensitive:worker:{hostname}:{pid} 604.2 Worker 心跳
- 周期:15s(与 Beat 调度周期一致)
- 更新内容:CPU 使用率、内存使用率、GPU 利用率、GPU 显存利用率
- TTL:60s(允许连续 3 次心跳丢失才判定死亡)
- 扫描方式:
SCAN非阻塞扫描,避免KEYS *阻塞
4.3 资源感知的任务分发
系统根据 Worker 资源情况智能分发任务:
| 任务类型 | 目标队列 | 资源要求 | 调度策略 |
|---|---|---|---|
| 图片处理任务(OCR) | sensitive.ocr | 无硬性要求(可 CPU 可 GPU) | 检查是否有任意可用 Worker |
| 文本处理任务 | sensitive.text | 无特殊要求 | 直接投递到队列 |
| 调度任务 | sensitive.dispatch | 无特殊要求 | 仅在调度器队列运行 |
Worker 可用性过滤条件(get_available_workers):
- Worker 的 Redis key 存在(TTL 未过期)
- 内存使用率 <
CELERY_WORKER_MEM_HIGH_WATERMARK(默认 75%) - CPU 使用率 <
CELERY_WORKER_CPU_HIGH_WATERMARK(默认 75%)
4.4 自适应 CPU/GPU 模式
Worker 在处理图片任务时,OCR 引擎根据运行时环境自动适配:
Worker 启动 → is_cuda_available() 检测
├─ 有 GPU → use_gpu=True, rec_batch_num=4, enable_mkldnn=False
└─ 无 GPU → use_gpu=False, rec_batch_num=2, enable_mkldnn=True, cpu_threads=4运行时发生 OOM/CUDA 错误时自动降级:
GPU OOM → 标记 _is_cuda=False → 重置 OCR 引擎 → 下次以 CPU 模式重初始化五、稳定性保障
5.1 过载保护(多层防线)
请求入口
│
├─ 防线一:全局积压限流
│ ├─ 全局积压 > CELERY_MAX_TOTAL_PENDING(默认 5000)→ 拒绝入队
│ └─ 返回 HTTP 503 + retry_after: 30
│
├─ 防线二:单设备积压限流
│ ├─ 单设备队列 > CELERY_MAX_PER_MAC_PENDING(默认 200)
│ └─ 标记为过载设备(sensitive:overloaded_macs)
│
├─ 防线三:Worker 过载自检(调度时)
│ ├─ CPU > 75% → 暂停拉取新任务
│ ├─ 内存 > 75% → 暂停拉取新任务
│ └─ GPU 显存 > 75% → 暂停拉取新任务
│
└─ 防线四:子任务级过载自检
├─ CPU 过载 → self.retry(countdown=30)
├─ 内存过载 → self.retry(countdown=30)
└─ GPU 显存过载 → self.retry(countdown=30)过载状态判断(is_queue_overloaded)基于 O(1) 的 Redis 计数器和集合:
def is_queue_overloaded():
# O(1) 检查,无阻塞
total_pending = GET "sensitive:global_pending"
if total_pending > CELERY_MAX_TOTAL_PENDING:
return True
overloaded_count = SCARD "sensitive:overloaded_macs"
if overloaded_count > 0:
return True
return False5.2 背压机制(Backpressure)
Worker 侧主动背压防止资源耗尽:
- 调度时背压:
process_mac_sensitive_tasks循环中每次迭代检查本地 CPU/内存/GPU,超阈值立即 break - 执行时背压:
execute_ocr_sensitive_job/execute_text_sensitive_job启动时过载自检,超阈值则self.retry(countdown=N) - 任务降级:无 GPU Worker 可用时的三种策略:
| 策略 | 行为 | 适用场景 |
|---|---|---|
reject | 直接丢弃,记录失败 | GPU 完全不可用,不接受降级 |
defer | 指数退避后重试 | 期望 GPU 资源即将恢复 |
skip_ocr | 跳过 OCR 处理 | 业务允许降级 |
5.3 指数退避(Exponential Backoff)
设备级退避机制避免频繁重试打满系统:
backoff_seconds = min(
CELERY_MAC_DEFER_BACKOFF_BASE × 2^(defer_count - 1), # 30, 60, 120, 240, 480...
CELERY_MAC_DEFER_BACKOFF_MAX # 600s cap
)退避效果:
| 退避次数 | 退避时长 | 累计等待 |
|---|---|---|
| 1 | 30s | 30s |
| 2 | 60s | 90s |
| 3 | 120s | 210s |
| 4 | 240s | 450s |
| 5 | 480s | 930s (阈值) → 死信 |
退避 MAC 通过 unschedulable_until 时间戳记录在 meta hash 中,调度器在 claim_next_schedulable_mac() 中跳过,退避到期后由 _resume_expired_backoff_macs() 自动恢复。
5.4 死信队列(Dead Letter Queue)
超过最大重试次数的任务移入死信队列:
LPUSH sensitive:dead_letter {
task_id, mac, task_type,
error: "defer 次数超过阈值",
requeue_count: 6,
created_at, dead_at
}
LTRIM sensitive:dead_letter 0 9999 # 保留最近 10000 条死信队列通过 LTRIM 限制大小,避免无限制增长耗尽 Redis 内存。
5.5 任务 TTL 过期机制
所有任务携带 created_at 时间戳,确保不会无限排队:
| 机制 | 触发方式 | 说明 |
|---|---|---|
| 惰性过期 | pop_next_mac_task() 出队时 | 消费时判断 TTL,过期则丢弃并继续取下一条 |
| 定时清理 | Beat 每 300s 触发 purge_expired_mac_tasks | 使用 LRANGE 扫描队列头部最多 100 条,LTRIM 批量删除 |
def _is_task_expired(payload):
elapsed = now - created_at
return elapsed > CELERY_MAC_TASK_TTL_SECONDS # 默认 3600s5.6 水位监控(Watermark)
定期发布系统水位到 Redis,支撑弹性扩缩容决策:
SET sensitive:watermark {
total_pending: 1234, # 全局积压数
mac_count: 42, # 活跃设备数
active_workers: 8, # 存活 Worker 数
timestamp: "2026-06-18T10:00:00"
}
EXPIRE sensitive:watermark 20发布周期:CELERY_WATERMARK_PUBLISH_INTERVAL(默认 15s),扫描使用 SCAN 非阻塞模式。
六、系统配置总览
6.1 核心配置项
| 配置项 | 默认值 | 说明 |
|---|---|---|
CELERY_MAX_TOTAL_PENDING | 5000 | 全局最大积压任务数 |
CELERY_MAX_PER_MAC_PENDING | 200 | 单设备最大积压任务数 |
CELERY_WORKER_CPU_HIGH_WATERMARK | 75% | Worker CPU 过载阈值 |
CELERY_WORKER_MEM_HIGH_WATERMARK | 75% | Worker 内存过载阈值 |
CELERY_WORKER_GPU_MEM_HIGH_WATERMARK | 75% | Worker GPU 显存过载阈值 |
CELERY_MAC_ACTIVE_TTL | 900s | 设备运行锁超时时间 |
CELERY_MAC_TASK_TTL_SECONDS | 3600s | 任务存活时间 |
CELERY_MAC_AGING_MAX_WAIT_SECONDS | 600s | 老化等待时间上限 |
CELERY_MAC_STARVATION_HINT_SECONDS | 30s | 饥饿判定阈值 |
CELERY_MAC_SCHEDULER_BATCH_SIZE | 10 | 正常批处理大小 |
CELERY_MAC_SCHEDULER_MIN_BATCH_SIZE | 1 | 饥饿时最小批处理大小 |
OCR_FALLBACK_STRATEGY | reject | 降级策略 |
CELERY_OCR_MAX_DEFER_COUNT | 5 | 最大退避次数 |
CELERY_MAC_DEFER_BACKOFF_BASE | 30s | 退避基时 |
CELERY_MAC_DEFER_BACKOFF_MAX | 600s | 退避最大时长 |
CELERY_WORKER_TTL | 60s | Worker 心跳过期时间 |
CELERY_WATERMARK_PUBLISH_INTERVAL | 15s | 水位发布间隔 |
CELERY_MAC_AGING_WEIGHT | 1.0 | 老化权重系数 |
6.2 Celery 配置
| 配置项 | 值 |
|---|---|
| Broker | Redis |
| Serializer | JSON |
| task_acks_late | True |
| task_reject_on_worker_lost | True |
| worker_prefetch_multiplier | 1 |
| result_expires | 180s |
| ignore_result | True |
6.3 PostgreSQL 连接池
| 配置项 | 值 |
|---|---|
| pool_size | 10 |
| max_overflow | 20 |
| pool_timeout | 30s |
| pool_recycle | 1800s |
七、定时任务机制
系统通过 Celery Beat 驱动三个核心定时任务,保障分布式系统的自运维能力:
| 任务 | 周期 | 说明 |
|---|---|---|
worker_heartbeat | 15s | Worker 心跳更新,维持资源感知调度的基础数据 |
publish_watermark | 15s | 队列水位发布,输出系统运行状态快照 |
purge_expired_mac_tasks | 300s | 过期任务主动清理,防止积压任务耗尽内存 |
这些定时任务与事件驱动流程(入队、出队、退避恢复)共同构成系统的完整调度闭环。
八、数据流完整链路
┌──────────┐ ┌──────────┐ ┌────────────┐ ┌──────────┐
│ 客户端 │───>│ Web 路由 │───>│ Service │───>│ Redis │
│ │ │ /api/* │ │ 入队+过载检查 │ │ MAC 队列 │
└──────────┘ └──────────┘ └────────────┘ └─────┬────┘
│
┌────────────────────────────────────┘
│
▼
┌─────────────────┐
│ Celery Beat │
│ (定时触发调度器) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ 全局调度器 │
│ 选取最优 MAC │
│ 加分布式锁 │
└────────┬────────┘
│
▼
┌─────────────────┐
│ 设备级调度器 │
│ 批量拉取任务 │
│ Worker 背压 │
│ 提交子任务 │
└──┬──────────┬───┘
│ │
▼ ▼
┌────────────┐ ┌────────────┐
│ OCR 子任务 │ │ 文本子任务 │
│ GPU/CPU │ │ CPU │
│ OCR 识别 │ │ 敏感词匹配 │
│ 敏感词匹配 │ │ 结果入库 │
│ 结果入库 │ │ │
└────────────┘ └────────────┘九、关键设计决策
| 决策 | 选择 | 理由 |
|---|---|---|
| 设备维度队列 | Redis List | 简单可靠,LPUSH/RPOP O(1) 操作 |
| 调度优先级 | Redis Sorted Set | 天然支持按分数排序,ZREVRANGE O(log N) |
| 分布式锁 | Redis SET NX + TTL | 无额外依赖,TTL 防死锁 |
| 全局计数器 | Redis INCR/DECR | O(1) 操作,高并发友好 |
| Worker 注册 | Redis Hash + TTL | TTL 自动清理死 Worker |
| 过载检测 | psutil + GPUtil | 已在依赖中,零额外成本 |
| 子任务拆分 | 独立 Celery Task | 避免调度 Task 阻塞,支持路由到不同队列 |
| 扫描而非 KEYS | SCAN 游标 | 避免 Redis 单线程阻塞 |
| 死信队列 | LPUSH + LTRIM | 简单可靠,限制大小防内存泄漏 |
十、可观测性设计
10.1 关键可观测指标
系统在设计上内置了丰富的可观测性指标,以支撑运行时状态推断:
| 指标 | 数据来源 | 采集复杂度 |
|---|---|---|
| 全局积压数 | Redis sensitive:global_pending | O(1),原子计数器 |
| 设备活跃数 | ZSET sensitive:mac:schedule 大小 | O(1) ZCARD |
| 存活 Worker 数 | SCAN sensitive:worker:* | 非阻塞游标扫描 |
| 过载设备数 | SET sensitive:overloaded_macs 大小 | O(1) SCARD |
| 死信队列深度 | sensitive:dead_letter 长度 | O(1) LLEN |
| 队列水位快照 | sensitive:watermark JSON | O(1) GET |
10.2 异常场景的容错设计
| 场景 | 设计策略 |
|---|---|
| Worker 进程崩溃 | Celery acks_late=True + worker_lost 自动重新入队,保证 at-least-once 语义 |
| Worker 集体不可用 | 任务安全积压于 Redis 队列,Worker 恢复后自动继续消费,无人工介入需要 |
| Redis 实例故障 | 任务数据丢失(无持久化),系统设计上接受此风险,依赖上游重试补偿 |
| 单 Worker 内存泄漏 | Worker 进程级别隔离,单个 Worker OOM 不影响集群中其他 Worker |
| 调度器进程崩溃 | Beat 与 Worker 进程分离部署,Beat 重启后按周期恢复调度,不丢失调度状态 |
| 资源竞争条件 | 分布式锁 + defer 重试计数 + 死信队列三层兜底,防止任务无限卡死 |


Comments | NOTHING