分布式任务调度与资源管理系统设计


一、系统概述

本系统是一个面向内容检测场景的分布式异步任务调度与资源管理平台,核心提供三方面能力:

  • 任务调度:基于设备维度的逻辑队列 + 老化优先级调度,实现多租户公平调度与饥饿感知
  • 资源调度:Worker 节点资源注册与心跳,CPU/GPU/内存感知的任务分发
  • 稳定性保障:过载保护、背压机制、指数退避、死信队列、任务 TTL 过期等多层可靠性策略

系统基于 Flask + Celery + Redis + PostgreSQL 技术栈构建,采用三层架构(Web 路由层 → 业务服务层 → 异步任务层),所有异步任务通过 Celery 分发至专用队列,由 Worker 集群消费。


二、整体架构

┌─────────────────────────────────────────────────────────┐
│                     Web 路由层 (Flask)                    │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐  │
│  │  检测结果上报   │  │  图片处理任务   │  │  文本处理任务   │  │
│  │  /api/decter  │  │  /api/ocr/*  │  │  /api/ocr/*  │  │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘  │
└─────────┼──────────────────┼──────────────────┼──────────┘
          │                  │                  │
          ▼                  ▼                  ▼
┌─────────────────────────────────────────────────────────┐
│                    服务层 (Services)                       │
│  ┌────────────────┐  ┌──────────────────────────────┐   │
│  │  检测结果处理服务  │  │  任务处理服务                  │   │
│  │  (同步处理)      │  │  (异步入队 + 过载检查)         │   │
│  └────────────────┘  └──────────┬───────────────────┘   │
│                                  │                       │
│  ┌───────────────────────────────▼────────────────────┐  │
│  │            MAC 队列调度服务                          │  │
│  │  按设备维度逻辑队列 · 老化优先级排序 · 分布式互斥锁    │  │
│  │  指数退避 · 死信 · 饥饿检测 · 动态批大小              │  │
│  └───────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────┘
          │
          ▼
┌─────────────────────────────────────────────────────────┐
│                   异步任务层 (Celery + Redis)              │
│                                                         │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐  │
│  │ 调度队列      │  │ 图片处理队列    │  │ 文本处理队列    │  │
│  │    dispatch  │  │    ocr      │  │    text      │  │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘  │
│         │                 │                 │           │
│         ▼                 ▼                 ▼           │
│  ┌──────────────────────────────────────────────────┐   │
│  │              Worker 集群                          │   │
│  │  GPU Worker (消费所有队列) · CPU Worker (消费部分) │   │
│  │  Worker 注册/心跳 · 过载自检 · 自适应 CPU/GPU    │   │
│  └──────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────┘
          │
          ▼
┌─────────────────────────────────────────────────────────┐
│                  数据持久层 (PostgreSQL)                   │
│  任务记录 · 处理结果 · 统计报表                           │
└─────────────────────────────────────────────────────────┘

三、任务调度系统

3.1 核心调度模型

系统采用两级调度模型:

第一级:全局调度器 (Celery Beat + dispatch Task)
  → 从 Sorted Set 中选取最优 MAC 设备
  → 加分布式互斥锁防止多 Worker 冲突

第二级:设备级调度器 (process_mac_sensitive_tasks Task)
  → 从该设备的 Redis List 中批量拉取任务
  → Worker 过载自检(背压)
  → 提交具体子任务到专用队列
  → 处理完毕后释放锁,触发下一轮全局调度

3.2 按设备维度的逻辑队列

每个设备拥有独立的 Redis List 作为任务队列:

  • 队列键格式sensitive:mac:queue:{normalized_mac}
  • 元数据键格式sensitive:mac:meta:{normalized_mac}(存储积压数、最早任务时间、退避状态)
  • 入队操作:RPUSH + 原子递增全局计数器 + 更新调度分数
  • 出队操作:RPOP + 原子递减全局计数器 + 惰性过期检查

每设备独立队列的设计带来以下优势:

优势说明
故障隔离单设备大量任务不影响其他设备的调度
公平调度每个设备独立计算调度优先级,防止少数设备占满所有 Worker
退避隔离设备级指数退避,失败设备自动降权不影响正常设备

3.3 老化优先级调度

使用 Redis Sorted Set(sensitive:mac:schedule)维护所有活跃设备的调度优先级:

调度分数 = min(等待秒数, CELERY_MAC_AGING_MAX_WAIT_SECONDS) × CELERY_MAC_AGING_WEIGHT

关键特性:

  • 等待越久优先级越高:等待时间长的设备获得更高分数,ZREVRANGE 获取最高分设备
  • 等待时间上限CELERY_MAC_AGING_MAX_WAIT_SECONDS(默认 600s)防止极端情况
  • 元数据缓存:最早任务的 created_at 缓存在 meta hash 中,避免反复读取队列头部
  • 惰性更新:每次入队/出队/心跳时触发 refresh_mac_schedule_score() 更新分数

3.4 调度流程详解

1. Beat 触发调度(周期 15s)
   │
   ▼
2. dispatch_next_mac_sensitive_tasks
   │  ├─ 恢复退避到期的 MAC
   │  └─ claim_next_schedulable_mac()
   │       ├─ 从 Sorted Set 中 ZREVRANGE 遍历候选 MAC
   │       ├─ 跳过退避中的 MAC(check unschedulable_until)
   │       └─ try_acquire_mac_dispatch() — SET NX + TTL 加锁
   │
   ▼
3. process_mac_sensitive_tasks(mac)
   │  ├─ 计算动态批大小(get_dynamic_batch_size)
   │  │    ├─ 有饥饿设备 → 最小批大小(默认 1)
   │  │    └─ 无饥饿设备 → 正常批大小(默认 10)
   │  │
   │  ├─ 循环处理批内任务
   │  │    ├─ Worker 过载自检(背压)
   │  │    ├─ 刷新 MAC 运行锁 TTL
   │  │    ├─ pop_next_mac_task() — 惰性过期检查
   │  │    ├─ 无可用 GPU Worker → 执行 defer/死信策略
   │  │    └─ 提交子任务到专用队列
   │  │
   │  └─ finally:
   │       ├─ 释放 MAC 运行锁
   │       ├─ 更新调度分数 / 移除空队列
   │       └─ 续投全局调度器(有饥饿时传 exclude_mac)
   │
   ▼
4. execute_ocr_sensitive_job / execute_text_sensitive_job
   │  子任务在专用队列上由 Worker 消费
   │  ├─ 过载自检(retry 背压)
   │  ├─ 执行具体处理逻辑
   │  └─ 结果持久化到 PostgreSQL

3.5 饥饿检测与动态批大小

饥饿检测机制确保没有设备被长期忽略:

  • 判定条件:下一个待调度设备的等待时间 ≥ CELERY_MAC_STARVATION_HINT_SECONDS(默认 30s)
  • 响应动作:当前设备的批大小从默认值(10)缩减为最小值(1),让出调度机会
  • 续投机制:处理完当前设备后,若有饥饿设备存在,全局调度器以 exclude_mac 参数续投,跳过刚处理完的设备
def get_dynamic_batch_size(current_mac):
    if has_starved_mac(exclude_mac=current_mac):
        return max(1, CELERY_MAC_SCHEDULER_MIN_BATCH_SIZE)  # 默认 1
    return max(1, CELERY_MAC_SCHEDULER_BATCH_SIZE)           # 默认 10

3.6 分布式互斥锁

同一设备可能被多个 Worker 同时调度,需互斥保证:

  • 加锁SET sensitive:mac:active:{mac} {timestamp} NX EX {TTL}
  • 续约:处理期间定期 EXPIRE 刷新 TTL
  • 解锁:处理完成后 DEL 删除
  • 锁超时:默认 900s(CELERY_MAC_ACTIVE_TTL),防止 Worker 崩溃导致死锁

四、资源调度系统

4.1 Worker 资源注册

每个 Worker 启动时通过 @signals.worker_ready 钩子自动注册资源信息到 Redis:

HSET sensitive:worker:{hostname}:{pid}
    hostname         "worker-01"
    pid              "12345"
    gpu_available    "1"           # PaddlePaddle CUDA 检测
    gpu_util         "23.5"        # GPUtil
    gpu_memory_util  "45.2"        # GPUtil
    cpu_count        "16"
    cpu_percent      "35.0"        # psutil
    memory_total     "33559728128"
    memory_available "12559728128"
    memory_percent   "62.5"        # psutil
    active_tasks     "0"
    last_heartbeat   "2026-06-18T10:00:00+00:00"
EXPIRE sensitive:worker:{hostname}:{pid} 60

4.2 Worker 心跳

  • 周期:15s(与 Beat 调度周期一致)
  • 更新内容:CPU 使用率、内存使用率、GPU 利用率、GPU 显存利用率
  • TTL:60s(允许连续 3 次心跳丢失才判定死亡)
  • 扫描方式SCAN 非阻塞扫描,避免 KEYS * 阻塞

4.3 资源感知的任务分发

系统根据 Worker 资源情况智能分发任务:

任务类型目标队列资源要求调度策略
图片处理任务(OCR)sensitive.ocr无硬性要求(可 CPU 可 GPU)检查是否有任意可用 Worker
文本处理任务sensitive.text无特殊要求直接投递到队列
调度任务sensitive.dispatch无特殊要求仅在调度器队列运行

Worker 可用性过滤条件(get_available_workers):

  1. Worker 的 Redis key 存在(TTL 未过期)
  2. 内存使用率 < CELERY_WORKER_MEM_HIGH_WATERMARK(默认 75%)
  3. CPU 使用率 < CELERY_WORKER_CPU_HIGH_WATERMARK(默认 75%)

4.4 自适应 CPU/GPU 模式

Worker 在处理图片任务时,OCR 引擎根据运行时环境自动适配:

Worker 启动 → is_cuda_available() 检测
    ├─ 有 GPU → use_gpu=True, rec_batch_num=4, enable_mkldnn=False
    └─ 无 GPU → use_gpu=False, rec_batch_num=2, enable_mkldnn=True, cpu_threads=4

运行时发生 OOM/CUDA 错误时自动降级:

GPU OOM → 标记 _is_cuda=False → 重置 OCR 引擎 → 下次以 CPU 模式重初始化

五、稳定性保障

5.1 过载保护(多层防线)

请求入口
   │
   ├─ 防线一:全局积压限流
   │    ├─ 全局积压 > CELERY_MAX_TOTAL_PENDING(默认 5000)→ 拒绝入队
   │    └─ 返回 HTTP 503 + retry_after: 30
   │
   ├─ 防线二:单设备积压限流
   │    ├─ 单设备队列 > CELERY_MAX_PER_MAC_PENDING(默认 200)
   │    └─ 标记为过载设备(sensitive:overloaded_macs)
   │
   ├─ 防线三:Worker 过载自检(调度时)
   │    ├─ CPU > 75% → 暂停拉取新任务
   │    ├─ 内存 > 75% → 暂停拉取新任务
   │    └─ GPU 显存 > 75% → 暂停拉取新任务
   │
   └─ 防线四:子任务级过载自检
        ├─ CPU 过载 → self.retry(countdown=30)
        ├─ 内存过载 → self.retry(countdown=30)
        └─ GPU 显存过载 → self.retry(countdown=30)

过载状态判断(is_queue_overloaded)基于 O(1) 的 Redis 计数器和集合:

def is_queue_overloaded():
    # O(1) 检查,无阻塞
    total_pending = GET "sensitive:global_pending"
    if total_pending > CELERY_MAX_TOTAL_PENDING:
        return True
    overloaded_count = SCARD "sensitive:overloaded_macs"
    if overloaded_count > 0:
        return True
    return False

5.2 背压机制(Backpressure)

Worker 侧主动背压防止资源耗尽:

  1. 调度时背压process_mac_sensitive_tasks 循环中每次迭代检查本地 CPU/内存/GPU,超阈值立即 break
  2. 执行时背压execute_ocr_sensitive_job / execute_text_sensitive_job 启动时过载自检,超阈值则 self.retry(countdown=N)
  3. 任务降级:无 GPU Worker 可用时的三种策略:
策略行为适用场景
reject直接丢弃,记录失败GPU 完全不可用,不接受降级
defer指数退避后重试期望 GPU 资源即将恢复
skip_ocr跳过 OCR 处理业务允许降级

5.3 指数退避(Exponential Backoff)

设备级退避机制避免频繁重试打满系统:

backoff_seconds = min(
    CELERY_MAC_DEFER_BACKOFF_BASE × 2^(defer_count - 1),  # 30, 60, 120, 240, 480...
    CELERY_MAC_DEFER_BACKOFF_MAX  # 600s cap
)

退避效果:

退避次数退避时长累计等待
130s30s
260s90s
3120s210s
4240s450s
5480s930s (阈值) → 死信

退避 MAC 通过 unschedulable_until 时间戳记录在 meta hash 中,调度器在 claim_next_schedulable_mac() 中跳过,退避到期后由 _resume_expired_backoff_macs() 自动恢复。

5.4 死信队列(Dead Letter Queue)

超过最大重试次数的任务移入死信队列:

LPUSH sensitive:dead_letter {
    task_id, mac, task_type,
    error: "defer 次数超过阈值",
    requeue_count: 6,
    created_at, dead_at
}
LTRIM sensitive:dead_letter 0 9999  # 保留最近 10000 条

死信队列通过 LTRIM 限制大小,避免无限制增长耗尽 Redis 内存。

5.5 任务 TTL 过期机制

所有任务携带 created_at 时间戳,确保不会无限排队:

机制触发方式说明
惰性过期pop_next_mac_task() 出队时消费时判断 TTL,过期则丢弃并继续取下一条
定时清理Beat 每 300s 触发 purge_expired_mac_tasks使用 LRANGE 扫描队列头部最多 100 条,LTRIM 批量删除
def _is_task_expired(payload):
    elapsed = now - created_at
    return elapsed > CELERY_MAC_TASK_TTL_SECONDS  # 默认 3600s

5.6 水位监控(Watermark)

定期发布系统水位到 Redis,支撑弹性扩缩容决策:

SET sensitive:watermark {
    total_pending:    1234,     # 全局积压数
    mac_count:        42,       # 活跃设备数
    active_workers:   8,        # 存活 Worker 数
    timestamp:        "2026-06-18T10:00:00"
}
EXPIRE sensitive:watermark 20

发布周期:CELERY_WATERMARK_PUBLISH_INTERVAL(默认 15s),扫描使用 SCAN 非阻塞模式。


六、系统配置总览

6.1 核心配置项

配置项默认值说明
CELERY_MAX_TOTAL_PENDING5000全局最大积压任务数
CELERY_MAX_PER_MAC_PENDING200单设备最大积压任务数
CELERY_WORKER_CPU_HIGH_WATERMARK75%Worker CPU 过载阈值
CELERY_WORKER_MEM_HIGH_WATERMARK75%Worker 内存过载阈值
CELERY_WORKER_GPU_MEM_HIGH_WATERMARK75%Worker GPU 显存过载阈值
CELERY_MAC_ACTIVE_TTL900s设备运行锁超时时间
CELERY_MAC_TASK_TTL_SECONDS3600s任务存活时间
CELERY_MAC_AGING_MAX_WAIT_SECONDS600s老化等待时间上限
CELERY_MAC_STARVATION_HINT_SECONDS30s饥饿判定阈值
CELERY_MAC_SCHEDULER_BATCH_SIZE10正常批处理大小
CELERY_MAC_SCHEDULER_MIN_BATCH_SIZE1饥饿时最小批处理大小
OCR_FALLBACK_STRATEGYreject降级策略
CELERY_OCR_MAX_DEFER_COUNT5最大退避次数
CELERY_MAC_DEFER_BACKOFF_BASE30s退避基时
CELERY_MAC_DEFER_BACKOFF_MAX600s退避最大时长
CELERY_WORKER_TTL60sWorker 心跳过期时间
CELERY_WATERMARK_PUBLISH_INTERVAL15s水位发布间隔
CELERY_MAC_AGING_WEIGHT1.0老化权重系数

6.2 Celery 配置

配置项
BrokerRedis
SerializerJSON
task_acks_lateTrue
task_reject_on_worker_lostTrue
worker_prefetch_multiplier1
result_expires180s
ignore_resultTrue

6.3 PostgreSQL 连接池

配置项
pool_size10
max_overflow20
pool_timeout30s
pool_recycle1800s

七、定时任务机制

系统通过 Celery Beat 驱动三个核心定时任务,保障分布式系统的自运维能力:

任务周期说明
worker_heartbeat15sWorker 心跳更新,维持资源感知调度的基础数据
publish_watermark15s队列水位发布,输出系统运行状态快照
purge_expired_mac_tasks300s过期任务主动清理,防止积压任务耗尽内存

这些定时任务与事件驱动流程(入队、出队、退避恢复)共同构成系统的完整调度闭环。


八、数据流完整链路

┌──────────┐    ┌──────────┐    ┌────────────┐    ┌──────────┐
│   客户端   │───>│  Web 路由 │───>│  Service   │───>│ Redis    │
│          │    │  /api/*  │    │ 入队+过载检查 │    │ MAC 队列  │
└──────────┘    └──────────┘    └────────────┘    └─────┬────┘
                                                         │
                    ┌────────────────────────────────────┘
                    │
                    ▼
           ┌─────────────────┐
           │  Celery Beat     │
           │  (定时触发调度器)  │
           └────────┬────────┘
                    │
                    ▼
           ┌─────────────────┐
           │  全局调度器       │
           │  选取最优 MAC     │
           │  加分布式锁       │
           └────────┬────────┘
                    │
                    ▼
           ┌─────────────────┐
           │  设备级调度器     │
           │  批量拉取任务     │
           │  Worker 背压     │
           │  提交子任务       │
           └──┬──────────┬───┘
              │          │
              ▼          ▼
    ┌────────────┐ ┌────────────┐
    │ OCR 子任务  │ │ 文本子任务   │
    │ GPU/CPU    │ │ CPU        │
    │ OCR 识别   │ │ 敏感词匹配  │
    │ 敏感词匹配  │ │ 结果入库    │
    │ 结果入库    │ │            │
    └────────────┘ └────────────┘

九、关键设计决策

决策选择理由
设备维度队列Redis List简单可靠,LPUSH/RPOP O(1) 操作
调度优先级Redis Sorted Set天然支持按分数排序,ZREVRANGE O(log N)
分布式锁Redis SET NX + TTL无额外依赖,TTL 防死锁
全局计数器Redis INCR/DECRO(1) 操作,高并发友好
Worker 注册Redis Hash + TTLTTL 自动清理死 Worker
过载检测psutil + GPUtil已在依赖中,零额外成本
子任务拆分独立 Celery Task避免调度 Task 阻塞,支持路由到不同队列
扫描而非 KEYSSCAN 游标避免 Redis 单线程阻塞
死信队列LPUSH + LTRIM简单可靠,限制大小防内存泄漏

十、可观测性设计

10.1 关键可观测指标

系统在设计上内置了丰富的可观测性指标,以支撑运行时状态推断:

指标数据来源采集复杂度
全局积压数Redis sensitive:global_pendingO(1),原子计数器
设备活跃数ZSET sensitive:mac:schedule 大小O(1) ZCARD
存活 Worker 数SCAN sensitive:worker:*非阻塞游标扫描
过载设备数SET sensitive:overloaded_macs 大小O(1) SCARD
死信队列深度sensitive:dead_letter 长度O(1) LLEN
队列水位快照sensitive:watermark JSONO(1) GET

10.2 异常场景的容错设计

场景设计策略
Worker 进程崩溃Celery acks_late=True + worker_lost 自动重新入队,保证 at-least-once 语义
Worker 集体不可用任务安全积压于 Redis 队列,Worker 恢复后自动继续消费,无人工介入需要
Redis 实例故障任务数据丢失(无持久化),系统设计上接受此风险,依赖上游重试补偿
单 Worker 内存泄漏Worker 进程级别隔离,单个 Worker OOM 不影响集群中其他 Worker
调度器进程崩溃Beat 与 Worker 进程分离部署,Beat 重启后按周期恢复调度,不丢失调度状态
资源竞争条件分布式锁 + defer 重试计数 + 死信队列三层兜底,防止任务无限卡死

声明:一代明君的小屋|版权所有,违者必究|如未注明,均为原创|本网站采用BY-NC-SA协议进行授权

转载:转载请注明原文链接 - 分布式任务调度与资源管理系统设计


欢迎来到我的小屋