一、概述
基于mac的设备客户端任务队列调度服务是一个基于 Redis 的、按设备标识(MAC)维度隔离的异步任务调度中间件,核心解决以下问题:
- 多租户公平调度:大量设备共享有限的 Worker 资源,需保证各设备的任务都能被及时处理
- 资源感知调度:不同任务对 CPU/GPU 资源需求不同,需感知 Worker 资源状况动态分配
- 稳定性自保:防止某设备异常发量冲垮系统,也防止系统过载后雪崩
该服务的核心思想是:为每个设备维护一个独立逻辑队列,通过老化优先级计算各设备的调度得分,结合分布式互斥锁实现多 Worker 安全的公平调度。
二、数据模型
整个调度系统的状态全部存储在 Redis 中,核心数据结构分为五类:
┌─────────────────────────────────────────────────────────────────┐
│ MAC 队列调度系统 │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 设备逻辑队列 │ 元数据 │ 调度集 │ 全局计数器 │ │
│ │ (List) │ (Hash) │ (ZSET) │ (String) │ │
│ │ │ │ │ │ │
│ │ task_mac_q:m1 │ meta:m1 │ schedule │ global_pend │ │
│ │ task_mac_q:m2 │ meta:m2 │ [m1:15.2] │ → 128 │ │
│ │ task_mac_q:m3 │ meta:m3 │ [m2:42.5] │ │ │
│ │ ... │ ... │ [m3:8.1] │ │ │
│ └────────────────┴──────────────┴───────────┴─────────────┘ │
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 运行锁 │ 过载集合 │ 死信队列 │ │
│ │ (String) │ (Set) │ (List) │ │
│ │ │ │ │ │
│ │ active:m1 → TTL │ overloaded: │ dead_letter │ │
│ │ active:m2 → TTL │ {m3, m7} │ [{task1}, ...] │ │
│ └──────────────────┴─────────────────┴──────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘2.1 数据结构详表
| 用途 | Redis 类型 | 键格式 | 值说明 |
|---|---|---|---|
| 设备队列 | List | sensitive:mac:queue:{mac} | JSON 序列化的任务 payload,按入队顺序排列 |
| 设备元数据 | Hash | sensitive:mac:meta:{mac} | 字段:pending_count(积压数)、oldest_created_at(最早任务时间)、unschedulable_until(退避截止时间)、updated_at(更新时间) |
| 调度集合 | ZSet | sensitive:mac:schedule | member = mac,score = 老化调度分数 |
| 全局计数器 | String | sensitive:global_pending | 所有设备积压任务总数(原子计数器) |
| 运行锁 | String | sensitive:mac:active:{mac} | SET NX + TTL,防止多 Worker 同时调度同一设备 |
| 过载集合 | Set | sensitive:overloaded_macs | 积压超过阈值的设备集合 |
| 死信队列 | List | sensitive:dead_letter | 超过最大退避次数的任务记录 |
| 水位快照 | String | sensitive:watermark | 定期发布的系统运行状态 JSON |
三、核心操作
3.1 入队流程(Enqueue)
客户端请求
│
▼
检查全局过载 ──> 过载 ──> 拒绝入队,返回 503
│
▼ (正常)
构建任务 payload (含 task_id, task_type, mac, created_at 等)
│
▼
Redis Pipeline:
├─ RPUSH queue_key, serialized_payload (追加到队列尾部)
├─ HINCRBY meta_key pending_count 1 (递增积压计数器)
└─ INCR sensitive:global_pending (递增全局计数器)
│
▼
如果是该设备的第一个任务
└─ HSET meta_key oldest_created_at = payload.created_at
│
▼
如果 pending_count > 单设备阈值
└─ SADD sensitive:overloaded_macs mac
│
▼
refresh_mac_schedule_score(mac)
└─ 重新计算该设备的调度分数,更新 ZSET关键代码视角:
def enqueue_payload(payload):
pipeline = client.pipeline()
pipeline.rpush(queue_key, serialized_payload)
pipeline.hincrby(meta_key, "pending_count", 1)
pipeline.incr("sensitive:global_pending")
_, pending_count, _ = pipeline.execute()
if int(pending_count) == 1:
client.hset(meta_key, mapping={"oldest_created_at": payload["created_at"]})
if int(pending_count) > CELERY_MAX_PER_MAC_PENDING:
client.sadd("sensitive:overloaded_macs", mac)
refresh_mac_schedule_score(mac)3.2 调度流程(Scheduling)
系统采用两级调度模型,在 Celery 的 Worker 进程中通过链式任务调用实现:
第一级:全局调度器
Celery Beat (每 15s)
│
▼
dispatch_next_mac_sensitive_tasks
├─ _resume_expired_backoff_macs() ← 恢复退避到期的设备
└─ claim_next_schedulable_mac()
│
├─ ZREVRANGE schedule 0 -1 ← 从高到低遍历所有候选设备
├─ 跳过 exclude_mac(刚处理完的设备)
├─ 跳过退避未到期的设备(check unschedulable_until)
└─ try_acquire_mac_dispatch() ← SET NX 加分布式锁
│
成功 → 从 ZSET 移除该设备 → 投递 process task
失败 → 遍历下一个候选第二级:设备级调度器
process_mac_sensitive_tasks(mac)
│
├─ 计算动态批大小
│ ├─ has_starved_mac()? → batch = MIN_BATCH (1)
│ └─ 无饥饿 → batch = NORMAL_BATCH (10)
│
├─ loop (batch_size 次):
│ ├─ _is_local_worker_overloaded()? → break (背压)
│ ├─ refresh_mac_dispatch() → EXPIRE 续约锁
│ ├─ pop_next_mac_task()
│ │ └─ RPOP 队列 → 过期检查 → 返回有效任务
│ └─ 根据 task_type 提交子任务到相应队列
│
└─ finally:
├─ 释放运行锁 (DEL active key)
├─ 有剩余任务? → 更新 ZSET 分数 / 从 ZSET 移除
└─ 有饥饿设备? → 续投全局调度器 (带 exclude_mac)3.3 出队流程(Dequeue)
pop_next_mac_task(mac)
│
▼
RPOP queue_key ← 从队列尾部弹出(FIFO 语义)
│
▼
任务为空?
├─ Yes → refresh_mac_schedule_score() → 返回 None
│
└─ No → 检查 TTL 过期?
├─ 过期 → HINCRBY pending_count -1 → DECR global_pending
│ → 日志记录 → continue(继续取下一条)
│
└─ 有效 → HINCRBY pending_count -1
→ DECR global_pending
→ pending_count <= 0?
├─ Yes → 删除 meta key,从 ZSET 移除
└─ No → 更新 oldest_created_at
→ refresh_mac_schedule_score()
→ pending_count <= 阈值?
└─ SREM overloaded_macs
→ 返回 payload3.4 Defer 与指数退避
当 OCR 任务投递时无可用 GPU Worker,触发 defer 逻辑:
requeue_task_with_defer(payload)
│
├─ defer_count = payload.defer_count + 1
│
├─ defer_count > MAX_DEFER(5)?
│ └─ Yes → LPUSH dead_letter × LTRIM 0 9999 → 返回 False (死信)
│
└─ No → RPUSH queue_key (重新入队)
→ pending_count +1 → global_pending +1
→ _apply_defer_backoff(mac, defer_count)
│
├─ backoff = min(30 × 2^(defer-1), 600)
├─ HSET meta_key unschedulable_until = now + backoff
└─ ZREM schedule mac (从调度集中移除)
│
▼
返回 True (已重新入队)退避时间序列:
| # | 退避时长 | 累计等待 |
|---|---|---|
| 1 | 30s | 30s |
| 2 | 60s | 90s |
| 3 | 120s | 210s |
| 4 | 240s | 450s |
| 5 | 480s | 930s → 死信 |
3.5 任务 TTL 过期
双向过期策略确保任务不会无限堆积:
机制一:惰性过期(出队时判断)
───────────────────────────
pop_next_mac_task 中:
if (now - created_at) > TTL(3600s):
丢弃,更新计数器,继续取下一条
机制二:定时批量清理(周期性全队列扫描)
───────────────────────────
purge_expired_mac_tasks (每 300s 执行一次):
get_all_mac_queues() → SCAN 非阻塞扫描所有队列
for each mac:
LRANGE 头部 100 条
统计连续过期数
LTRIM 保留未过期部分(批量删除)
更新全局计数器3.6 过载保护
is_queue_overloaded()
├─ GET "sensitive:global_pending" > 5000? → True (全局限流)
└─ SCARD "sensitive:overloaded_macs" > 0? → True (单设备限流)
Worker 侧背压(process_mac_sensitive_tasks 循环中):
├─ psutil.cpu_percent() > 75%? → break
├─ psutil.virtual_memory().percent > 75%? → break
└─ GPU 显存 > 75%? → break四、简化数学模型
4.1 调度分数
设备 d 在时刻 t 的调度优先级分数为:
S(d, t) = min(W(d, t), W_max) × α
其中:
W(d, t) = t - t_first(d) ← 最早任务的等待时间(秒)
W_max = 600 ← 等待时间上限(可配置)
α = 1.0 ← 老化权重(可配置)分数越高,优先级越高。调度器使用 ZREVRANGE 按分数降序获取候选设备。
4.2 动态批大小
当前设备 d_cur 的批处理大小:
B(d_cur) = / B_min = 1 , if ∃ d ≠ d_cur : W(d, t) ≥ T_starve
\ B_norm = 10 , otherwise
其中:
T_starve = 30 (饥饿判定阈值,秒)4.3 指数退避
设备 d 在第 n 次退避时的禁调时长:
Backoff(n) = min(Base × 2^(n-1), Backoff_max)
其中:
Base = 30 (秒)
Backoff_max = 600 (秒)
n ∈ [1, 5],n > 5 进入死信4.4 过载判定
系统过载条件:
Overloaded = (G > G_max) ∨ (|O| > 0)
其中:
G = global_pending (全局积压总数)
G_max = 5000
O = overloaded_macs 集合(pending_count > 200 的设备集合)五、模型代码
以下是一个独立可运行的 Python 模型代码,实现了 MAC 队列调度服务的核心逻辑,不依赖 Celery、Flask 等框架,仅依赖 redis 库。
"""
MAC Queue Scheduling Service — Simplified Model
A standalone demonstration of the core MAC-based task scheduling algorithm.
Dependencies: pip install redis
"""
import json
import time
import uuid
import logging
from datetime import datetime, timezone
from typing import Optional
# ============================================================
# Configuration
# ============================================================
class Config:
REDIS_URL = "redis://127.0.0.1:6379/0"
# Queue keys
MAC_QUEUE_PREFIX = "model:mac:queue:"
MAC_META_PREFIX = "model:mac:meta:"
MAC_ACTIVE_PREFIX = "model:mac:active:"
SCHEDULE_KEY = "model:mac:schedule"
GLOBAL_PENDING_KEY = "model:global_pending"
OVERLOADED_MACS_KEY = "model:overloaded_macs"
DEAD_LETTER_KEY = "model:dead_letter"
# Thresholds
MAX_TOTAL_PENDING = 5000
MAX_PER_MAC_PENDING = 200
ACTIVE_TTL = 900
TASK_TTL = 3600
AGING_MAX_WAIT = 600
AGING_WEIGHT = 1.0
STARVATION_HINT = 30
BATCH_SIZE = 10
MIN_BATCH_SIZE = 1
MAX_DEFER_COUNT = 5
DEFER_BACKOFF_BASE = 30
DEFER_BACKOFF_MAX = 600
# ============================================================
# Redis client
# ============================================================
import redis as redis_lib
_redis: Optional[redis_lib.Redis] = None
def get_redis():
global _redis
if _redis is None:
_redis = redis_lib.from_url(Config.REDIS_URL, decode_responses=True)
return _redis
# ============================================================
# Helper functions
# ============================================================
def _iso_now() -> str:
return datetime.now(timezone.utc).isoformat()
def _serialize(payload: dict) -> str:
return json.dumps(payload, ensure_ascii=False)
def _deserialize(raw: Optional[str]) -> Optional[dict]:
return json.loads(raw) if raw else None
def _now_ts() -> float:
return time.time()
def _wait_seconds(created_at_iso: Optional[str]) -> float:
if not created_at_iso:
return 0.0
elapsed = (datetime.now(timezone.utc) - datetime.fromisoformat(created_at_iso)).total_seconds()
return max(0.0, elapsed)
def _schedule_score(wait_sec: float) -> float:
capped = min(wait_sec, Config.AGING_MAX_WAIT)
return capped * Config.AGING_WEIGHT
# ============================================================
# Core API — Enqueue
# ============================================================
def enqueue_task(mac: str, task_type: str, payload_data: dict) -> str:
"""
将任务入队到指定 MAC 的逻辑队列。
返回 task_id。
"""
r = get_redis()
task_id = uuid.uuid4().hex
payload = {
"task_id": task_id,
"task_type": task_type,
"mac": mac,
"created_at": _iso_now(),
**payload_data,
}
queue_key = f"{Config.MAC_QUEUE_PREFIX}{mac}"
meta_key = f"{Config.MAC_META_PREFIX}{mac}"
# 原子入队
pipe = r.pipeline()
pipe.rpush(queue_key, _serialize(payload))
pipe.hincrby(meta_key, "pending_count", 1)
pipe.incr(Config.GLOBAL_PENDING_KEY)
_, pending_count, _ = pipe.execute()
# 首个任务记录 oldest_created_at
if int(pending_count) == 1:
r.hset(meta_key, "oldest_created_at", payload["created_at"])
# 单设备过载标记
if int(pending_count) > Config.MAX_PER_MAC_PENDING:
r.sadd(Config.OVERLOADED_MACS_KEY, mac)
# 刷新调度分数
_refresh_schedule_score(mac)
return task_id
# ============================================================
# Core API — Schedule
# ============================================================
def _refresh_schedule_score(mac: str) -> Optional[float]:
"""重新计算并更新设备的调度分数。"""
r = get_redis()
queue_key = f"{Config.MAC_QUEUE_PREFIX}{mac}"
meta_key = f"{Config.MAC_META_PREFIX}{mac}"
pending = r.llen(queue_key)
if pending <= 0:
r.delete(meta_key)
r.zrem(Config.SCHEDULE_KEY, mac)
return None
oldest = r.hget(meta_key, "oldest_created_at")
if not oldest:
# 从队列头部读取
head = r.lindex(queue_key, 0)
head_payload = _deserialize(head)
oldest = head_payload.get("created_at") if head_payload else None
wait = _wait_seconds(oldest)
score = _schedule_score(wait)
r.hset(meta_key, mapping={
"pending_count": pending,
"oldest_created_at": oldest or "",
"updated_at": _iso_now(),
})
r.zadd(Config.SCHEDULE_KEY, {mac: score})
return score
def try_acquire_lock(mac: str) -> bool:
"""尝试获取设备的调度锁(SET NX + TTL)。"""
r = get_redis()
active_key = f"{Config.MAC_ACTIVE_PREFIX}{mac}"
return bool(r.set(active_key, _iso_now(), nx=True, ex=Config.ACTIVE_TTL))
def release_lock(mac: str):
"""释放设备的调度锁。"""
r = get_redis()
active_key = f"{Config.MAC_ACTIVE_PREFIX}{mac}"
r.delete(active_key)
def claim_next_mac(exclude_mac: Optional[str] = None) -> Optional[str]:
"""
从调度集中选出优先级最高的、可调度的设备。
返回值:选中的 mac,或 None(无可用设备)。
"""
r = get_redis()
now = _now_ts()
for candidate in r.zrevrange(Config.SCHEDULE_KEY, 0, -1):
if exclude_mac and candidate == exclude_mac:
continue
# 跳过退避中的设备
meta_key = f"{Config.MAC_META_PREFIX}{candidate}"
until = r.hget(meta_key, "unschedulable_until")
if until and float(until) > now:
continue
if try_acquire_lock(candidate):
r.zrem(Config.SCHEDULE_KEY, candidate)
return candidate
return None
# ============================================================
# Core API — Dequeue
# ============================================================
def pop_task(mac: str) -> Optional[dict]:
"""
从设备队列中取出一条有效任务(自动过滤过期任务)。
返回 payload dict,或 None(队列为空)。
"""
r = get_redis()
queue_key = f"{Config.MAC_QUEUE_PREFIX}{mac}"
meta_key = f"{Config.MAC_META_PREFIX}{mac}"
while True:
raw = r.rpop(queue_key)
payload = _deserialize(raw)
if not payload:
_refresh_schedule_score(mac)
return None
# 惰性过期检查
created_at = payload.get("created_at")
if created_at and _wait_seconds(created_at) > Config.TASK_TTL:
remaining = r.hincrby(meta_key, "pending_count", -1)
r.decr(Config.GLOBAL_PENDING_KEY)
if remaining <= 0:
r.delete(meta_key)
r.zrem(Config.SCHEDULE_KEY, mac)
continue # 跳过过期任务
# 有效任务
remaining = r.hincrby(meta_key, "pending_count", -1)
r.decr(Config.GLOBAL_PENDING_KEY)
if remaining <= 0:
r.delete(meta_key)
r.zrem(Config.SCHEDULE_KEY, mac)
else:
# 更新 oldest_created_at
head = r.lindex(queue_key, 0)
head_payload = _deserialize(head)
if head_payload:
r.hset(meta_key, "oldest_created_at", head_payload["created_at"])
_refresh_schedule_score(mac)
# 恢复过载标记
if remaining <= Config.MAX_PER_MAC_PENDING:
r.srem(Config.OVERLOADED_MACS_KEY, mac)
return payload
# ============================================================
# Defer & Backoff
# ============================================================
def requeue_with_defer(payload: dict) -> bool:
"""
将任务重新入队并应用指数退避。
返回 True = 已重入队,False = 已入死信。
"""
r = get_redis()
mac = payload["mac"]
queue_key = f"{Config.MAC_QUEUE_PREFIX}{mac}"
meta_key = f"{Config.MAC_META_PREFIX}{mac}"
defer_count = int(payload.get("defer_count", 0)) + 1
if defer_count > Config.MAX_DEFER_COUNT:
# 移入死信队列
dead_entry = {
"task_id": payload.get("task_id", "unknown"),
"mac": mac,
"task_type": payload.get("task_type"),
"error": f"defer 超过阈值 ({defer_count})",
"created_at": payload.get("created_at"),
"dead_at": _iso_now(),
}
r.lpush(Config.DEAD_LETTER_KEY, _serialize(dead_entry))
r.ltrim(Config.DEAD_LETTER_KEY, 0, 9999)
return False
payload["defer_count"] = defer_count
r.rpush(queue_key, _serialize(payload))
r.hincrby(meta_key, "pending_count", 1)
r.incr(Config.GLOBAL_PENDING_KEY)
# 计算退避时长并设置
backoff = min(
Config.DEFER_BACKOFF_BASE * (2 ** (defer_count - 1)),
Config.DEFER_BACKOFF_MAX,
)
unschedulable_until = _now_ts() + backoff
r.hset(meta_key, "unschedulable_until", str(unschedulable_until))
r.zrem(Config.SCHEDULE_KEY, mac)
return True
def resume_expired_backoffs() -> int:
"""扫描所有退避到期的设备,恢复其可调度状态。"""
r = get_redis()
now = _now_ts()
cursor = 0
resumed = 0
pattern = f"{Config.MAC_META_PREFIX}*"
while True:
cursor, keys = r.scan(cursor=cursor, match=pattern, count=200)
for key in keys:
until_raw = r.hget(key, "unschedulable_until")
if not until_raw:
continue
if float(until_raw) > now:
continue
mac = key[len(Config.MAC_META_PREFIX):]
r.hdel(key, "unschedulable_until")
pending = int(r.hget(key, "pending_count") or 0)
if pending > 0:
_refresh_schedule_score(mac)
resumed += 1
if cursor == 0:
break
return resumed
# ============================================================
# Overload Detection
# ============================================================
def is_overloaded() -> bool:
"""全局过载检查(O(1))。"""
r = get_redis()
total = int(r.get(Config.GLOBAL_PENDING_KEY) or 0)
if total > Config.MAX_TOTAL_PENDING:
return True
if r.scard(Config.OVERLOADED_MACS_KEY) > 0:
return True
return False
# ============================================================
# Starvation Detection & Dynamic Batch Size
# ============================================================
def get_oldest_wait(mac: str) -> float:
"""获取设备最早任务的已等待秒数。"""
r = get_redis()
meta_key = f"{Config.MAC_META_PREFIX}{mac}"
oldest = r.hget(meta_key, "oldest_created_at")
return _wait_seconds(oldest)
def has_starved_mac(exclude_mac: Optional[str] = None) -> bool:
"""检测是否存在等待超过饥饿阈值的设备。"""
r = get_redis()
for candidate in r.zrevrange(Config.SCHEDULE_KEY, 0, -1):
if exclude_mac and candidate == exclude_mac:
continue
meta_key = f"{Config.MAC_META_PREFIX}{candidate}"
pending = int(r.hget(meta_key, "pending_count") or 0)
if pending > 0 and get_oldest_wait(candidate) >= Config.STARVATION_HINT:
return True
return False
def get_batch_size(current_mac: str) -> int:
"""动态批大小:有饥饿时缩减,无饥饿时正常。"""
if has_starved_mac(exclude_mac=current_mac):
return Config.MIN_BATCH_SIZE
return Config.BATCH_SIZE
# ============================================================
# Purge Expired Tasks
# ============================================================
def purge_expired_tasks() -> int:
"""扫描所有队列,批量清理过期任务。"""
r = get_redis()
prefix = Config.MAC_QUEUE_PREFIX
total_purged = 0
cursor = 0
while True:
cursor, keys = r.scan(cursor=cursor, match=f"{prefix}*", count=200)
for key in keys:
mac = key[len(prefix):]
queue_len = r.llen(key)
if queue_len == 0:
continue
scan_batch = min(queue_len, 100)
raw_tasks = r.lrange(key, 0, scan_batch - 1)
expired = 0
cutoff = None
for i, raw in enumerate(raw_tasks):
payload = _deserialize(raw)
if not payload:
continue
if _wait_seconds(payload.get("created_at")) > Config.TASK_TTL:
expired += 1
else:
if cutoff is None:
cutoff = i
break
if expired == 0:
continue
if cutoff is not None:
r.ltrim(key, cutoff, -1)
else:
r.delete(key)
r.decrby(Config.GLOBAL_PENDING_KEY, expired)
pending = r.llen(key)
meta_key = f"{Config.MAC_META_PREFIX}{mac}"
if pending == 0:
r.delete(meta_key)
r.zrem(Config.SCHEDULE_KEY, mac)
else:
r.hset(meta_key, "pending_count", pending)
_refresh_schedule_score(mac)
total_purged += expired
if cursor == 0:
break
return total_purged
# ============================================================
# Watermark Publishing
# ============================================================
def publish_watermark():
"""发布系统水位快照。"""
r = get_redis()
prefix = Config.MAC_QUEUE_PREFIX
total = 0
count = 0
cursor = 0
while True:
cursor, keys = r.scan(cursor=cursor, match=f"{prefix}*", count=200)
for key in keys:
total += r.llen(key)
count += 1
if cursor == 0:
break
worker_cursor = 0
worker_count = 0
while True:
worker_cursor, keys = r.scan(cursor=worker_cursor, match="model:worker:*", count=200)
worker_count += len(keys)
if worker_cursor == 0:
break
watermark = {
"total_pending": total,
"mac_count": count,
"active_workers": worker_count,
"timestamp": _iso_now(),
}
r.set("model:watermark", json.dumps(watermark, ensure_ascii=False))
# ============================================================
# Demo: Basic usage scenario
# ============================================================
def demo():
"""
演示 MAC 队列调度核心流程:
1. 三个设备入队 5 个任务
2. 模拟调度器选取最优设备
3. 出队处理
4. 模拟 defer + 退避
5. 饥饿检测
6. 过载检测
"""
r = get_redis()
r.flushdb() # Clean start
print("=" * 60)
print("MAC 队列调度服务 — 模型演示")
print("=" * 60)
# ── 1. 三个设备各入队若干任务 ──
print("\n[1] 设备 A 入队 3 个任务")
for i in range(3):
tid = enqueue_task("A", "text", {"seq": i, "data": f"task-{i}"})
print(f" task_id={tid[:8]}..., seq={i}")
print("\n[2] 设备 B 入队 1 个任务")
tid = enqueue_task("B", "ocr", {"seq": 0, "data": "image-0"})
print(f" task_id={tid[:8]}...")
print("\n[3] 设备 C 入队 5 个任务")
for i in range(5):
tid = enqueue_task("C", "text", {"seq": i, "data": f"task-{i}"})
print(f" task_id={tid[:8]}..., seq={i}")
# ── 2. 查看调度分数 ──
print("\n[4] 调度分数 (等待相同时间,队列越长分数越高)")
scores = r.zrevrange(Config.SCHEDULE_KEY, 0, -1, withscores=True)
for mac, score in scores:
print(f" {mac}: score={score:.1f}")
# ── 3. 模拟调度 ──
print("\n[5] 模拟调度 (claim_next_mac)")
selected = claim_next_mac()
print(f" 选中设备: {selected}")
# ── 4. 出队 ──
print(f"\n[6] 从 {selected} 出队 2 个任务")
for _ in range(2):
task = pop_task(selected)
if task:
print(f" 出队: {task['task_id'][:8]}... type={task['task_type']}")
else:
print(" 队列为空")
# ── 5. 释放锁 ──
release_lock(selected)
print(f"\n[7] 释放 {selected} 的运行锁")
# ── 6. 饥饿检测 ──
print(f"\n[8] 饥饿检测: has_starved_mac(exclude={selected}) = {has_starved_mac(selected)}")
print(f" batch_size = {get_batch_size(selected)} (有饥饿时=1, 正常={Config.BATCH_SIZE})")
# ── 7. Defer 退避 ──
print(f"\n[9] 模拟 defer 退避")
fake_payload = {
"task_id": uuid.uuid4().hex,
"task_type": "ocr",
"mac": "B",
"created_at": _iso_now(),
}
for d in range(1, 4):
result = requeue_with_defer(fake_payload)
print(f" defer #{d}: {'已入队' if result else '进入死信'}, "
f"backoff={Config.DEFER_BACKOFF_BASE * (2**(d-1))}s")
# ── 8. 过载检测 ──
print(f"\n[10] 过载检测: is_overloaded() = {is_overloaded()}")
print(f" global_pending = {r.get(Config.GLOBAL_PENDING_KEY)}")
# ── 9. 最终调度图 ──
print("\n[11] 最终调度集状态")
scores = r.zrevrange(Config.SCHEDULE_KEY, 0, -1, withscores=True)
if scores:
for mac, score in scores:
wait = get_oldest_wait(mac)
print(f" {mac}: score={score:.1f}, oldest_wait={wait:.1f}s")
else:
print(" 调度集为空")
print("\n" + "=" * 60)
print("演示完毕")
print("=" * 60)
r.flushdb()
if __name__ == "__main__":
demo()运行方式
pip install redis
python mac_queue_model.py预期输出示例:
============================================================
MAC 队列调度服务 — 模型演示
============================================================
[1] 设备 A 入队 3 个任务
task_id=a1b2c3d4..., seq=0
task_id=e5f6g7h8..., seq=1
task_id=i9j0k1l2..., seq=2
[2] 设备 B 入队 1 个任务
task_id=m3n4o5p6...
[3] 设备 C 入队 5 个任务
task_id=q7r8s9t0..., seq=0
...
[4] 调度分数 (等待相同时间,队列越长分数越高)
C: score=0.0
A: score=0.0
B: score=0.0
# 所有设备等待时间接近 0,分数均为 0
# ZREVRANGE 返回顺序由 ZADD 顺序决定
[5] 模拟调度 (claim_next_mac)
选中设备: C
[6] 从 C 出队 2 个任务
出队: q7r8s9t0... type=text
出队: ...
[9] 模拟 defer 退避
defer #1: 已入队, backoff=30s
defer #2: 已入队, backoff=60s
defer #3: 已入队, backoff=120s
[10] 过载检测: is_overloaded() = False六、与 Celery 框架的集成
在实际系统中,上述模型代码由 Celery Task 驱动执行:
Celery Beat (定时触发)
│
▼
dispatch_next_mac_sensitive_tasks (Task)
├─ 调用 claim_next_mac()
└─ 投递 process_mac_sensitive_tasks
│
▼
process_mac_sensitive_tasks (Task)
├─ 调用 get_batch_size() → pop_task() × batch
├─ Worker 过载自检(背压)
└─ 提交子任务到 ocr/text 队列
│
▼
execute_ocr_sensitive_job (Task, 路由到 ocr 队列)
execute_text_sensitive_job (Task, 路由到 text 队列)Celery 的 acks_late=True、worker_prefetch_multiplier=1 配置确保任务不丢失且 Worker 不会过度预取。
七、关键设计决策
| 决策 | 选择 | 理由 |
|---|---|---|
| 队列类型 | Redis List (RPUSH/RPOP) | O(1) 入队出队,天然 FIFO,无需额外排序 |
| 调度优先级 | Redis ZSet | 天然支持分数排序,O(log N) 操作复杂度 |
| 分布式锁 | SET NX + TTL | 无外部依赖,TTL 自动死锁恢复 |
| 全局计数器 | Redis String + INCR/DECR | O(1) 原子操作,高并发安全 |
| 过期检查 | 惰性 + 定时双向 | 惰性无额外开销,定时防积累 |
| 退避状态 | Hash 字段 + 时间戳 | 结构简单,过期由定时任务恢复 |
| 饥饿检测 | 遍历 ZSet 计算等待时间 | 每次调度前检查,精度高 |
| 死信队列 | List + LTRIM 限容 | 简单可靠,避免无限增长 |


Comments | NOTHING