基于mac的设备客户端任务队列调度服务


一、概述

基于mac的设备客户端任务队列调度服务是一个基于 Redis 的、按设备标识(MAC)维度隔离的异步任务调度中间件,核心解决以下问题:

  • 多租户公平调度:大量设备共享有限的 Worker 资源,需保证各设备的任务都能被及时处理
  • 资源感知调度:不同任务对 CPU/GPU 资源需求不同,需感知 Worker 资源状况动态分配
  • 稳定性自保:防止某设备异常发量冲垮系统,也防止系统过载后雪崩

该服务的核心思想是:为每个设备维护一个独立逻辑队列,通过老化优先级计算各设备的调度得分,结合分布式互斥锁实现多 Worker 安全的公平调度。


二、数据模型

整个调度系统的状态全部存储在 Redis 中,核心数据结构分为五类:

┌─────────────────────────────────────────────────────────────────┐
│                        MAC 队列调度系统                          │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │  设备逻辑队列  │  元数据      │  调度集   │ 全局计数器    │   │
│  │  (List)       │  (Hash)      │  (ZSET)  │  (String)    │   │
│  │                │              │           │              │   │
│  │ task_mac_q:m1  │ meta:m1      │ schedule  │ global_pend  │   │
│  │ task_mac_q:m2  │ meta:m2      │ [m1:15.2] │  → 128      │   │
│  │ task_mac_q:m3  │ meta:m3      │ [m2:42.5] │             │   │
│  │ ...            │ ...          │ [m3:8.1]  │             │   │
│  └────────────────┴──────────────┴───────────┴─────────────┘   │
│                                                                │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │  运行锁          │  过载集合        │  死信队列             │  │
│  │  (String)        │  (Set)          │  (List)              │  │
│  │                   │                 │                      │  │
│  │ active:m1 → TTL  │ overloaded:     │ dead_letter          │  │
│  │ active:m2 → TTL  │   {m3, m7}     │   [{task1}, ...]     │  │
│  └──────────────────┴─────────────────┴──────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘

2.1 数据结构详表

用途Redis 类型键格式值说明
设备队列Listsensitive:mac:queue:{mac}JSON 序列化的任务 payload,按入队顺序排列
设备元数据Hashsensitive:mac:meta:{mac}字段:pending_count(积压数)、oldest_created_at(最早任务时间)、unschedulable_until(退避截止时间)、updated_at(更新时间)
调度集合ZSetsensitive:mac:schedulemember = mac,score = 老化调度分数
全局计数器Stringsensitive:global_pending所有设备积压任务总数(原子计数器)
运行锁Stringsensitive:mac:active:{mac}SET NX + TTL,防止多 Worker 同时调度同一设备
过载集合Setsensitive:overloaded_macs积压超过阈值的设备集合
死信队列Listsensitive:dead_letter超过最大退避次数的任务记录
水位快照Stringsensitive:watermark定期发布的系统运行状态 JSON

三、核心操作

3.1 入队流程(Enqueue)

客户端请求
    │
    ▼
检查全局过载 ──> 过载 ──> 拒绝入队,返回 503
    │
    ▼ (正常)
构建任务 payload (含 task_id, task_type, mac, created_at 等)
    │
    ▼
Redis Pipeline:
    ├─ RPUSH queue_key, serialized_payload   (追加到队列尾部)
    ├─ HINCRBY meta_key pending_count 1       (递增积压计数器)
    └─ INCR  sensitive:global_pending          (递增全局计数器)
    │
    ▼
如果是该设备的第一个任务
    └─ HSET meta_key oldest_created_at = payload.created_at
    │
    ▼
如果 pending_count > 单设备阈值
    └─ SADD sensitive:overloaded_macs mac
    │
    ▼
refresh_mac_schedule_score(mac)
    └─ 重新计算该设备的调度分数,更新 ZSET

关键代码视角:

def enqueue_payload(payload):
    pipeline = client.pipeline()
    pipeline.rpush(queue_key, serialized_payload)
    pipeline.hincrby(meta_key, "pending_count", 1)
    pipeline.incr("sensitive:global_pending")
    _, pending_count, _ = pipeline.execute()

    if int(pending_count) == 1:
        client.hset(meta_key, mapping={"oldest_created_at": payload["created_at"]})
    if int(pending_count) > CELERY_MAX_PER_MAC_PENDING:
        client.sadd("sensitive:overloaded_macs", mac)

    refresh_mac_schedule_score(mac)

3.2 调度流程(Scheduling)

系统采用两级调度模型,在 Celery 的 Worker 进程中通过链式任务调用实现:

第一级:全局调度器

Celery Beat (每 15s)
    │
    ▼
dispatch_next_mac_sensitive_tasks
    ├─ _resume_expired_backoff_macs()    ← 恢复退避到期的设备
    └─ claim_next_schedulable_mac()
         │
         ├─ ZREVRANGE schedule 0 -1      ← 从高到低遍历所有候选设备
         ├─ 跳过 exclude_mac(刚处理完的设备)
         ├─ 跳过退避未到期的设备(check unschedulable_until)
         └─ try_acquire_mac_dispatch()   ← SET NX 加分布式锁
              │
              成功 → 从 ZSET 移除该设备 → 投递 process task
              失败 → 遍历下一个候选

第二级:设备级调度器

process_mac_sensitive_tasks(mac)
    │
    ├─ 计算动态批大小
    │    ├─ has_starved_mac()? → batch = MIN_BATCH (1)
    │    └─ 无饥饿 → batch = NORMAL_BATCH (10)
    │
    ├─ loop (batch_size 次):
    │    ├─ _is_local_worker_overloaded()? → break (背压)
    │    ├─ refresh_mac_dispatch() → EXPIRE 续约锁
    │    ├─ pop_next_mac_task()
    │    │    └─ RPOP 队列 → 过期检查 → 返回有效任务
    │    └─ 根据 task_type 提交子任务到相应队列
    │
    └─ finally:
         ├─ 释放运行锁 (DEL active key)
         ├─ 有剩余任务? → 更新 ZSET 分数 / 从 ZSET 移除
         └─ 有饥饿设备? → 续投全局调度器 (带 exclude_mac)

3.3 出队流程(Dequeue)

pop_next_mac_task(mac)
    │
    ▼
RPOP queue_key    ← 从队列尾部弹出(FIFO 语义)
    │
    ▼
任务为空?
    ├─ Yes → refresh_mac_schedule_score() → 返回 None
    │
    └─ No → 检查 TTL 过期?
         ├─ 过期 → HINCRBY pending_count -1 → DECR global_pending
         │          → 日志记录 → continue(继续取下一条)
         │
         └─ 有效 → HINCRBY pending_count -1
                    → DECR global_pending
                    → pending_count <= 0?
                         ├─ Yes → 删除 meta key,从 ZSET 移除
                         └─ No  → 更新 oldest_created_at
                                   → refresh_mac_schedule_score()
                    → pending_count <= 阈值?
                         └─ SREM overloaded_macs
                    → 返回 payload

3.4 Defer 与指数退避

当 OCR 任务投递时无可用 GPU Worker,触发 defer 逻辑:

requeue_task_with_defer(payload)
    │
    ├─ defer_count = payload.defer_count + 1
    │
    ├─ defer_count > MAX_DEFER(5)?
    │    └─ Yes → LPUSH dead_letter × LTRIM 0 9999 → 返回 False (死信)
    │
    └─ No → RPUSH queue_key (重新入队)
            → pending_count +1 → global_pending +1
            → _apply_defer_backoff(mac, defer_count)
                 │
                 ├─ backoff = min(30 × 2^(defer-1), 600)
                 ├─ HSET meta_key unschedulable_until = now + backoff
                 └─ ZREM schedule mac (从调度集中移除)
    │
    ▼
    返回 True (已重新入队)

退避时间序列:

#退避时长累计等待
130s30s
260s90s
3120s210s
4240s450s
5480s930s → 死信

3.5 任务 TTL 过期

双向过期策略确保任务不会无限堆积:

机制一:惰性过期(出队时判断)
───────────────────────────
pop_next_mac_task 中:
    if (now - created_at) > TTL(3600s):
        丢弃,更新计数器,继续取下一条

机制二:定时批量清理(周期性全队列扫描)
───────────────────────────
purge_expired_mac_tasks (每 300s 执行一次):
    get_all_mac_queues() → SCAN 非阻塞扫描所有队列
    for each mac:
        LRANGE 头部 100 条
        统计连续过期数
        LTRIM 保留未过期部分(批量删除)
        更新全局计数器

3.6 过载保护

is_queue_overloaded()
    ├─ GET "sensitive:global_pending" > 5000? → True (全局限流)
    └─ SCARD "sensitive:overloaded_macs" > 0? → True (单设备限流)

Worker 侧背压(process_mac_sensitive_tasks 循环中):
    ├─ psutil.cpu_percent() > 75%? → break
    ├─ psutil.virtual_memory().percent > 75%? → break
    └─ GPU 显存 > 75%? → break

四、简化数学模型

4.1 调度分数

设备 d 在时刻 t 的调度优先级分数为:

S(d, t) = min(W(d, t), W_max) × α

其中:
  W(d, t) = t - t_first(d)     ← 最早任务的等待时间(秒)
  W_max   = 600                 ← 等待时间上限(可配置)
  α       = 1.0                 ← 老化权重(可配置)

分数越高,优先级越高。调度器使用 ZREVRANGE 按分数降序获取候选设备。

4.2 动态批大小

当前设备 d_cur 的批处理大小:

B(d_cur) = /  B_min  = 1     ,  if ∃ d ≠ d_cur : W(d, t) ≥ T_starve
           \  B_norm = 10    ,  otherwise

其中:
  T_starve = 30 (饥饿判定阈值,秒)

4.3 指数退避

设备 d 在第 n 次退避时的禁调时长:

Backoff(n) = min(Base × 2^(n-1), Backoff_max)

其中:
  Base       = 30 (秒)
  Backoff_max = 600 (秒)
  n ∈ [1, 5],n > 5 进入死信

4.4 过载判定

系统过载条件:

Overloaded = (G > G_max) ∨ (|O| > 0)

其中:
  G     = global_pending (全局积压总数)
  G_max = 5000
  O     = overloaded_macs 集合(pending_count > 200 的设备集合)

五、模型代码

以下是一个独立可运行的 Python 模型代码,实现了 MAC 队列调度服务的核心逻辑,不依赖 Celery、Flask 等框架,仅依赖 redis 库。

"""
MAC Queue Scheduling Service — Simplified Model

A standalone demonstration of the core MAC-based task scheduling algorithm.

Dependencies: pip install redis
"""

import json
import time
import uuid
import logging
from datetime import datetime, timezone
from typing import Optional

# ============================================================
# Configuration
# ============================================================

class Config:
    REDIS_URL = "redis://127.0.0.1:6379/0"

    # Queue keys
    MAC_QUEUE_PREFIX = "model:mac:queue:"
    MAC_META_PREFIX = "model:mac:meta:"
    MAC_ACTIVE_PREFIX = "model:mac:active:"
    SCHEDULE_KEY = "model:mac:schedule"
    GLOBAL_PENDING_KEY = "model:global_pending"
    OVERLOADED_MACS_KEY = "model:overloaded_macs"
    DEAD_LETTER_KEY = "model:dead_letter"

    # Thresholds
    MAX_TOTAL_PENDING = 5000
    MAX_PER_MAC_PENDING = 200
    ACTIVE_TTL = 900
    TASK_TTL = 3600
    AGING_MAX_WAIT = 600
    AGING_WEIGHT = 1.0
    STARVATION_HINT = 30
    BATCH_SIZE = 10
    MIN_BATCH_SIZE = 1
    MAX_DEFER_COUNT = 5
    DEFER_BACKOFF_BASE = 30
    DEFER_BACKOFF_MAX = 600


# ============================================================
# Redis client
# ============================================================

import redis as redis_lib

_redis: Optional[redis_lib.Redis] = None

def get_redis():
    global _redis
    if _redis is None:
        _redis = redis_lib.from_url(Config.REDIS_URL, decode_responses=True)
    return _redis


# ============================================================
# Helper functions
# ============================================================

def _iso_now() -> str:
    return datetime.now(timezone.utc).isoformat()


def _serialize(payload: dict) -> str:
    return json.dumps(payload, ensure_ascii=False)


def _deserialize(raw: Optional[str]) -> Optional[dict]:
    return json.loads(raw) if raw else None


def _now_ts() -> float:
    return time.time()


def _wait_seconds(created_at_iso: Optional[str]) -> float:
    if not created_at_iso:
        return 0.0
    elapsed = (datetime.now(timezone.utc) - datetime.fromisoformat(created_at_iso)).total_seconds()
    return max(0.0, elapsed)


def _schedule_score(wait_sec: float) -> float:
    capped = min(wait_sec, Config.AGING_MAX_WAIT)
    return capped * Config.AGING_WEIGHT


# ============================================================
# Core API — Enqueue
# ============================================================

def enqueue_task(mac: str, task_type: str, payload_data: dict) -> str:
    """
    将任务入队到指定 MAC 的逻辑队列。
    返回 task_id。
    """
    r = get_redis()
    task_id = uuid.uuid4().hex

    payload = {
        "task_id": task_id,
        "task_type": task_type,
        "mac": mac,
        "created_at": _iso_now(),
        **payload_data,
    }

    queue_key = f"{Config.MAC_QUEUE_PREFIX}{mac}"
    meta_key = f"{Config.MAC_META_PREFIX}{mac}"

    # 原子入队
    pipe = r.pipeline()
    pipe.rpush(queue_key, _serialize(payload))
    pipe.hincrby(meta_key, "pending_count", 1)
    pipe.incr(Config.GLOBAL_PENDING_KEY)
    _, pending_count, _ = pipe.execute()

    # 首个任务记录 oldest_created_at
    if int(pending_count) == 1:
        r.hset(meta_key, "oldest_created_at", payload["created_at"])

    # 单设备过载标记
    if int(pending_count) > Config.MAX_PER_MAC_PENDING:
        r.sadd(Config.OVERLOADED_MACS_KEY, mac)

    # 刷新调度分数
    _refresh_schedule_score(mac)

    return task_id


# ============================================================
# Core API — Schedule
# ============================================================

def _refresh_schedule_score(mac: str) -> Optional[float]:
    """重新计算并更新设备的调度分数。"""
    r = get_redis()
    queue_key = f"{Config.MAC_QUEUE_PREFIX}{mac}"
    meta_key = f"{Config.MAC_META_PREFIX}{mac}"

    pending = r.llen(queue_key)
    if pending <= 0:
        r.delete(meta_key)
        r.zrem(Config.SCHEDULE_KEY, mac)
        return None

    oldest = r.hget(meta_key, "oldest_created_at")
    if not oldest:
        # 从队列头部读取
        head = r.lindex(queue_key, 0)
        head_payload = _deserialize(head)
        oldest = head_payload.get("created_at") if head_payload else None

    wait = _wait_seconds(oldest)
    score = _schedule_score(wait)

    r.hset(meta_key, mapping={
        "pending_count": pending,
        "oldest_created_at": oldest or "",
        "updated_at": _iso_now(),
    })
    r.zadd(Config.SCHEDULE_KEY, {mac: score})
    return score


def try_acquire_lock(mac: str) -> bool:
    """尝试获取设备的调度锁(SET NX + TTL)。"""
    r = get_redis()
    active_key = f"{Config.MAC_ACTIVE_PREFIX}{mac}"
    return bool(r.set(active_key, _iso_now(), nx=True, ex=Config.ACTIVE_TTL))


def release_lock(mac: str):
    """释放设备的调度锁。"""
    r = get_redis()
    active_key = f"{Config.MAC_ACTIVE_PREFIX}{mac}"
    r.delete(active_key)


def claim_next_mac(exclude_mac: Optional[str] = None) -> Optional[str]:
    """
    从调度集中选出优先级最高的、可调度的设备。
    返回值:选中的 mac,或 None(无可用设备)。
    """
    r = get_redis()
    now = _now_ts()

    for candidate in r.zrevrange(Config.SCHEDULE_KEY, 0, -1):
        if exclude_mac and candidate == exclude_mac:
            continue

        # 跳过退避中的设备
        meta_key = f"{Config.MAC_META_PREFIX}{candidate}"
        until = r.hget(meta_key, "unschedulable_until")
        if until and float(until) > now:
            continue

        if try_acquire_lock(candidate):
            r.zrem(Config.SCHEDULE_KEY, candidate)
            return candidate

    return None


# ============================================================
# Core API — Dequeue
# ============================================================

def pop_task(mac: str) -> Optional[dict]:
    """
    从设备队列中取出一条有效任务(自动过滤过期任务)。
    返回 payload dict,或 None(队列为空)。
    """
    r = get_redis()
    queue_key = f"{Config.MAC_QUEUE_PREFIX}{mac}"
    meta_key = f"{Config.MAC_META_PREFIX}{mac}"

    while True:
        raw = r.rpop(queue_key)
        payload = _deserialize(raw)
        if not payload:
            _refresh_schedule_score(mac)
            return None

        # 惰性过期检查
        created_at = payload.get("created_at")
        if created_at and _wait_seconds(created_at) > Config.TASK_TTL:
            remaining = r.hincrby(meta_key, "pending_count", -1)
            r.decr(Config.GLOBAL_PENDING_KEY)
            if remaining <= 0:
                r.delete(meta_key)
                r.zrem(Config.SCHEDULE_KEY, mac)
            continue  # 跳过过期任务

        # 有效任务
        remaining = r.hincrby(meta_key, "pending_count", -1)
        r.decr(Config.GLOBAL_PENDING_KEY)

        if remaining <= 0:
            r.delete(meta_key)
            r.zrem(Config.SCHEDULE_KEY, mac)
        else:
            # 更新 oldest_created_at
            head = r.lindex(queue_key, 0)
            head_payload = _deserialize(head)
            if head_payload:
                r.hset(meta_key, "oldest_created_at", head_payload["created_at"])
            _refresh_schedule_score(mac)

        # 恢复过载标记
        if remaining <= Config.MAX_PER_MAC_PENDING:
            r.srem(Config.OVERLOADED_MACS_KEY, mac)

        return payload


# ============================================================
# Defer & Backoff
# ============================================================

def requeue_with_defer(payload: dict) -> bool:
    """
    将任务重新入队并应用指数退避。
    返回 True = 已重入队,False = 已入死信。
    """
    r = get_redis()
    mac = payload["mac"]
    queue_key = f"{Config.MAC_QUEUE_PREFIX}{mac}"
    meta_key = f"{Config.MAC_META_PREFIX}{mac}"

    defer_count = int(payload.get("defer_count", 0)) + 1

    if defer_count > Config.MAX_DEFER_COUNT:
        # 移入死信队列
        dead_entry = {
            "task_id": payload.get("task_id", "unknown"),
            "mac": mac,
            "task_type": payload.get("task_type"),
            "error": f"defer 超过阈值 ({defer_count})",
            "created_at": payload.get("created_at"),
            "dead_at": _iso_now(),
        }
        r.lpush(Config.DEAD_LETTER_KEY, _serialize(dead_entry))
        r.ltrim(Config.DEAD_LETTER_KEY, 0, 9999)
        return False

    payload["defer_count"] = defer_count
    r.rpush(queue_key, _serialize(payload))
    r.hincrby(meta_key, "pending_count", 1)
    r.incr(Config.GLOBAL_PENDING_KEY)

    # 计算退避时长并设置
    backoff = min(
        Config.DEFER_BACKOFF_BASE * (2 ** (defer_count - 1)),
        Config.DEFER_BACKOFF_MAX,
    )
    unschedulable_until = _now_ts() + backoff
    r.hset(meta_key, "unschedulable_until", str(unschedulable_until))
    r.zrem(Config.SCHEDULE_KEY, mac)

    return True


def resume_expired_backoffs() -> int:
    """扫描所有退避到期的设备,恢复其可调度状态。"""
    r = get_redis()
    now = _now_ts()
    cursor = 0
    resumed = 0
    pattern = f"{Config.MAC_META_PREFIX}*"

    while True:
        cursor, keys = r.scan(cursor=cursor, match=pattern, count=200)
        for key in keys:
            until_raw = r.hget(key, "unschedulable_until")
            if not until_raw:
                continue
            if float(until_raw) > now:
                continue
            mac = key[len(Config.MAC_META_PREFIX):]
            r.hdel(key, "unschedulable_until")
            pending = int(r.hget(key, "pending_count") or 0)
            if pending > 0:
                _refresh_schedule_score(mac)
            resumed += 1
        if cursor == 0:
            break
    return resumed


# ============================================================
# Overload Detection
# ============================================================

def is_overloaded() -> bool:
    """全局过载检查(O(1))。"""
    r = get_redis()
    total = int(r.get(Config.GLOBAL_PENDING_KEY) or 0)
    if total > Config.MAX_TOTAL_PENDING:
        return True
    if r.scard(Config.OVERLOADED_MACS_KEY) > 0:
        return True
    return False


# ============================================================
# Starvation Detection & Dynamic Batch Size
# ============================================================

def get_oldest_wait(mac: str) -> float:
    """获取设备最早任务的已等待秒数。"""
    r = get_redis()
    meta_key = f"{Config.MAC_META_PREFIX}{mac}"
    oldest = r.hget(meta_key, "oldest_created_at")
    return _wait_seconds(oldest)


def has_starved_mac(exclude_mac: Optional[str] = None) -> bool:
    """检测是否存在等待超过饥饿阈值的设备。"""
    r = get_redis()
    for candidate in r.zrevrange(Config.SCHEDULE_KEY, 0, -1):
        if exclude_mac and candidate == exclude_mac:
            continue
        meta_key = f"{Config.MAC_META_PREFIX}{candidate}"
        pending = int(r.hget(meta_key, "pending_count") or 0)
        if pending > 0 and get_oldest_wait(candidate) >= Config.STARVATION_HINT:
            return True
    return False


def get_batch_size(current_mac: str) -> int:
    """动态批大小:有饥饿时缩减,无饥饿时正常。"""
    if has_starved_mac(exclude_mac=current_mac):
        return Config.MIN_BATCH_SIZE
    return Config.BATCH_SIZE


# ============================================================
# Purge Expired Tasks
# ============================================================

def purge_expired_tasks() -> int:
    """扫描所有队列,批量清理过期任务。"""
    r = get_redis()
    prefix = Config.MAC_QUEUE_PREFIX
    total_purged = 0
    cursor = 0

    while True:
        cursor, keys = r.scan(cursor=cursor, match=f"{prefix}*", count=200)
        for key in keys:
            mac = key[len(prefix):]
            queue_len = r.llen(key)
            if queue_len == 0:
                continue

            scan_batch = min(queue_len, 100)
            raw_tasks = r.lrange(key, 0, scan_batch - 1)

            expired = 0
            cutoff = None
            for i, raw in enumerate(raw_tasks):
                payload = _deserialize(raw)
                if not payload:
                    continue
                if _wait_seconds(payload.get("created_at")) > Config.TASK_TTL:
                    expired += 1
                else:
                    if cutoff is None:
                        cutoff = i
                    break

            if expired == 0:
                continue

            if cutoff is not None:
                r.ltrim(key, cutoff, -1)
            else:
                r.delete(key)

            r.decrby(Config.GLOBAL_PENDING_KEY, expired)
            pending = r.llen(key)
            meta_key = f"{Config.MAC_META_PREFIX}{mac}"
            if pending == 0:
                r.delete(meta_key)
                r.zrem(Config.SCHEDULE_KEY, mac)
            else:
                r.hset(meta_key, "pending_count", pending)
                _refresh_schedule_score(mac)
            total_purged += expired

        if cursor == 0:
            break
    return total_purged


# ============================================================
# Watermark Publishing
# ============================================================

def publish_watermark():
    """发布系统水位快照。"""
    r = get_redis()
    prefix = Config.MAC_QUEUE_PREFIX
    total = 0
    count = 0
    cursor = 0

    while True:
        cursor, keys = r.scan(cursor=cursor, match=f"{prefix}*", count=200)
        for key in keys:
            total += r.llen(key)
            count += 1
        if cursor == 0:
            break

    worker_cursor = 0
    worker_count = 0
    while True:
        worker_cursor, keys = r.scan(cursor=worker_cursor, match="model:worker:*", count=200)
        worker_count += len(keys)
        if worker_cursor == 0:
            break

    watermark = {
        "total_pending": total,
        "mac_count": count,
        "active_workers": worker_count,
        "timestamp": _iso_now(),
    }
    r.set("model:watermark", json.dumps(watermark, ensure_ascii=False))


# ============================================================
# Demo: Basic usage scenario
# ============================================================

def demo():
    """
    演示 MAC 队列调度核心流程:
    1. 三个设备入队 5 个任务
    2. 模拟调度器选取最优设备
    3. 出队处理
    4. 模拟 defer + 退避
    5. 饥饿检测
    6. 过载检测
    """
    r = get_redis()
    r.flushdb()  # Clean start

    print("=" * 60)
    print("MAC 队列调度服务 — 模型演示")
    print("=" * 60)

    # ── 1. 三个设备各入队若干任务 ──
    print("\n[1] 设备 A 入队 3 个任务")
    for i in range(3):
        tid = enqueue_task("A", "text", {"seq": i, "data": f"task-{i}"})
        print(f"    task_id={tid[:8]}..., seq={i}")

    print("\n[2] 设备 B 入队 1 个任务")
    tid = enqueue_task("B", "ocr", {"seq": 0, "data": "image-0"})
    print(f"    task_id={tid[:8]}...")

    print("\n[3] 设备 C 入队 5 个任务")
    for i in range(5):
        tid = enqueue_task("C", "text", {"seq": i, "data": f"task-{i}"})
        print(f"    task_id={tid[:8]}..., seq={i}")

    # ── 2. 查看调度分数 ──
    print("\n[4] 调度分数 (等待相同时间,队列越长分数越高)")
    scores = r.zrevrange(Config.SCHEDULE_KEY, 0, -1, withscores=True)
    for mac, score in scores:
        print(f"    {mac}: score={score:.1f}")

    # ── 3. 模拟调度 ──
    print("\n[5] 模拟调度 (claim_next_mac)")
    selected = claim_next_mac()
    print(f"    选中设备: {selected}")

    # ── 4. 出队 ──
    print(f"\n[6] 从 {selected} 出队 2 个任务")
    for _ in range(2):
        task = pop_task(selected)
        if task:
            print(f"    出队: {task['task_id'][:8]}... type={task['task_type']}")
        else:
            print("    队列为空")

    # ── 5. 释放锁 ──
    release_lock(selected)
    print(f"\n[7] 释放 {selected} 的运行锁")

    # ── 6. 饥饿检测 ──
    print(f"\n[8] 饥饿检测: has_starved_mac(exclude={selected}) = {has_starved_mac(selected)}")
    print(f"    batch_size = {get_batch_size(selected)} (有饥饿时=1, 正常={Config.BATCH_SIZE})")

    # ── 7. Defer 退避 ──
    print(f"\n[9] 模拟 defer 退避")
    fake_payload = {
        "task_id": uuid.uuid4().hex,
        "task_type": "ocr",
        "mac": "B",
        "created_at": _iso_now(),
    }
    for d in range(1, 4):
        result = requeue_with_defer(fake_payload)
        print(f"    defer #{d}: {'已入队' if result else '进入死信'}, "
              f"backoff={Config.DEFER_BACKOFF_BASE * (2**(d-1))}s")

    # ── 8. 过载检测 ──
    print(f"\n[10] 过载检测: is_overloaded() = {is_overloaded()}")
    print(f"     global_pending = {r.get(Config.GLOBAL_PENDING_KEY)}")

    # ── 9. 最终调度图 ──
    print("\n[11] 最终调度集状态")
    scores = r.zrevrange(Config.SCHEDULE_KEY, 0, -1, withscores=True)
    if scores:
        for mac, score in scores:
            wait = get_oldest_wait(mac)
            print(f"    {mac}: score={score:.1f}, oldest_wait={wait:.1f}s")
    else:
        print("    调度集为空")

    print("\n" + "=" * 60)
    print("演示完毕")
    print("=" * 60)
    r.flushdb()


if __name__ == "__main__":
    demo()

运行方式

pip install redis
python mac_queue_model.py

预期输出示例:

============================================================
MAC 队列调度服务 — 模型演示
============================================================

[1] 设备 A 入队 3 个任务
    task_id=a1b2c3d4..., seq=0
    task_id=e5f6g7h8..., seq=1
    task_id=i9j0k1l2..., seq=2

[2] 设备 B 入队 1 个任务
    task_id=m3n4o5p6...

[3] 设备 C 入队 5 个任务
    task_id=q7r8s9t0..., seq=0
    ...

[4] 调度分数 (等待相同时间,队列越长分数越高)
    C: score=0.0
    A: score=0.0
    B: score=0.0
    # 所有设备等待时间接近 0,分数均为 0
    # ZREVRANGE 返回顺序由 ZADD 顺序决定

[5] 模拟调度 (claim_next_mac)
    选中设备: C

[6] 从 C 出队 2 个任务
    出队: q7r8s9t0... type=text
    出队: ...

[9] 模拟 defer 退避
    defer #1: 已入队, backoff=30s
    defer #2: 已入队, backoff=60s
    defer #3: 已入队, backoff=120s

[10] 过载检测: is_overloaded() = False

六、与 Celery 框架的集成

在实际系统中,上述模型代码由 Celery Task 驱动执行:

Celery Beat (定时触发)
    │
    ▼
dispatch_next_mac_sensitive_tasks (Task)
    ├─ 调用 claim_next_mac()
    └─ 投递 process_mac_sensitive_tasks
         │
         ▼
process_mac_sensitive_tasks (Task)
    ├─ 调用 get_batch_size() → pop_task() × batch
    ├─ Worker 过载自检(背压)
    └─ 提交子任务到 ocr/text 队列
         │
         ▼
execute_ocr_sensitive_job (Task, 路由到 ocr 队列)
execute_text_sensitive_job (Task, 路由到 text 队列)

Celery 的 acks_late=Trueworker_prefetch_multiplier=1 配置确保任务不丢失且 Worker 不会过度预取。


七、关键设计决策

决策选择理由
队列类型Redis List (RPUSH/RPOP)O(1) 入队出队,天然 FIFO,无需额外排序
调度优先级Redis ZSet天然支持分数排序,O(log N) 操作复杂度
分布式锁SET NX + TTL无外部依赖,TTL 自动死锁恢复
全局计数器Redis String + INCR/DECRO(1) 原子操作,高并发安全
过期检查惰性 + 定时双向惰性无额外开销,定时防积累
退避状态Hash 字段 + 时间戳结构简单,过期由定时任务恢复
饥饿检测遍历 ZSet 计算等待时间每次调度前检查,精度高
死信队列List + LTRIM 限容简单可靠,避免无限增长

声明:一代明君的小屋|版权所有,违者必究|如未注明,均为原创|本网站采用BY-NC-SA协议进行授权

转载:转载请注明原文链接 - 基于mac的设备客户端任务队列调度服务


欢迎来到我的小屋