celery基于对象的逻辑栈分配设计


运行逻辑图

flowchart TD
    A[客户端请求<br/>/text/handler 或 /calculate/handler] --> B[Router 参数校验/解密/取敏感词]
    B --> C[handlerService.text_handler / calculate_handler]

    C --> D[生成业务 task_id]
    D --> E[按 object 标准化 queue_object]
    E --> F[写入 Redis 逻辑队列<br/>handler:object:queue:{queue_object}]

    F --> G{是否抢到该 object 的运行锁<br/>handler:object:active:{queue_object}}
    G -- 否 --> H[结束提交<br/>等待已有 dispatcher 继续消费]
    G -- 是 --> I[投递 Celery dispatcher 任务<br/>dispatch_object_handler_tasks(object)]

    I --> J[Celery worker 固定消费<br/>handler.dispatch]
    J --> K[dispatcher 启动]
    K --> L[循环处理最多 N 条<br/>CELERY_object_DISPATCH_BATCH_SIZE]

    L --> M[从 Redis 队列 lpop 一条任务]
    M --> N{还有任务吗}
    N -- 否 --> O[释放 object 运行锁]
    O --> P[本轮 dispatcher 结束]

    N -- 是 --> Q{任务类型}
    Q -- text_handle --> R[_run_text_handler_job]
    Q -- calculate_handler --> S[_run_calculate_handler_job]

    R --> T[写 handlerDetectRecord processing]
    T --> U[执行敏感词匹配]
    U --> V{命中敏感词?}
    V -- 是 --> W[send_commend + write_log]
    V -- 否 --> X[正常结束]
    W --> Y[写 handlerDetectRecord success/failed]
    X --> Y

    S --> Z[写 calculateEngineRecord/handlerDetectRecord processing]
    Z --> AA[perform_calculate]
    AA --> AB{calculate 成功?}
    AB -- 否 --> AC[写 calculate failed + Detect failed]
    AB -- 是 --> AD[保存 calculate 成功结果]
    AD --> AE[执行敏感词匹配]
    AE --> AF{命中敏感词?}
    AF -- 是 --> AG[send_commend + write_log]
    AF -- 否 --> AH[正常结束]
    AG --> AI[写 handlerDetectRecord success/failed]
    AH --> AI

    Y --> AJ[刷新 object 锁 TTL]
    AI --> AJ
    AJ --> L

    L --> AK{本轮处理后 Redis 仍有剩余任务?}
    AK -- 否 --> O
    AK -- 是 --> AL[再次 apply_async 投递 dispatcher]
    AL --> P

简化时序图

sequenceDiagram
    participant Client as 客户端
    participant Router as Router
    participant Service as handlerService
    participant Redis as Redis逻辑栈
    participant Celery as Celery Dispatcher
    participant Job as 业务执行函数
    participant DB as DB/日志/命令下发

    Client->>Router: 提交 text/calculate 请求
    Router->>Service: 调用 text_handler / calculate_handler
    Service->>Redis: rpush 到 object 对应逻辑队列
    Service->>Redis: setnx active 锁
    alt 抢锁成功
        Service->>Celery: apply_async(dispatch_object_handler_tasks)
    else 抢锁失败
        Service-->>Client: 返回 task_id,等待已有 dispatcher 消费
    end

    Celery->>Redis: lpop 一条 object 任务
    Redis-->>Celery: 返回 payload
    Celery->>Job: 调用 _run_text_handler_job / _run_calculate_handler_job
    Job->>DB: 写 processing 记录
    Job->>Job: calculate/敏感词检测
    opt 命中敏感词
        Job->>DB: write_log
        Job->>DB: send_commend
    end
    Job->>DB: 写 success/failed 记录

    Celery->>Redis: 检查是否还有剩余任务
    alt 还有剩余
        Celery->>Celery: 续投 dispatcher
    else 队列已空
        Celery->>Redis: 删除 active 锁
    end

核心理解

  • 同一个 object:只会有一个 dispatcher 持有锁,所以严格串行。
  • 不同 object:会进入各自 Redis 逻辑栈,由多个 Celery worker 并行推进。
  • Celery 侧:只维护固定 dispatcher 队列 handler.dispatch,不会因为设备增多而爆出大量物理队列。

声明:一代明君的小屋|版权所有,违者必究|如未注明,均为原创|本网站采用BY-NC-SA协议进行授权

转载:转载请注明原文链接 - celery基于对象的逻辑栈分配设计


欢迎来到我的小屋