运行逻辑图
flowchart TD
A[客户端请求<br/>/text/handler 或 /calculate/handler] --> B[Router 参数校验/解密/取敏感词]
B --> C[handlerService.text_handler / calculate_handler]
C --> D[生成业务 task_id]
D --> E[按 object 标准化 queue_object]
E --> F[写入 Redis 逻辑队列<br/>handler:object:queue:{queue_object}]
F --> G{是否抢到该 object 的运行锁<br/>handler:object:active:{queue_object}}
G -- 否 --> H[结束提交<br/>等待已有 dispatcher 继续消费]
G -- 是 --> I[投递 Celery dispatcher 任务<br/>dispatch_object_handler_tasks(object)]
I --> J[Celery worker 固定消费<br/>handler.dispatch]
J --> K[dispatcher 启动]
K --> L[循环处理最多 N 条<br/>CELERY_object_DISPATCH_BATCH_SIZE]
L --> M[从 Redis 队列 lpop 一条任务]
M --> N{还有任务吗}
N -- 否 --> O[释放 object 运行锁]
O --> P[本轮 dispatcher 结束]
N -- 是 --> Q{任务类型}
Q -- text_handle --> R[_run_text_handler_job]
Q -- calculate_handler --> S[_run_calculate_handler_job]
R --> T[写 handlerDetectRecord processing]
T --> U[执行敏感词匹配]
U --> V{命中敏感词?}
V -- 是 --> W[send_commend + write_log]
V -- 否 --> X[正常结束]
W --> Y[写 handlerDetectRecord success/failed]
X --> Y
S --> Z[写 calculateEngineRecord/handlerDetectRecord processing]
Z --> AA[perform_calculate]
AA --> AB{calculate 成功?}
AB -- 否 --> AC[写 calculate failed + Detect failed]
AB -- 是 --> AD[保存 calculate 成功结果]
AD --> AE[执行敏感词匹配]
AE --> AF{命中敏感词?}
AF -- 是 --> AG[send_commend + write_log]
AF -- 否 --> AH[正常结束]
AG --> AI[写 handlerDetectRecord success/failed]
AH --> AI
Y --> AJ[刷新 object 锁 TTL]
AI --> AJ
AJ --> L
L --> AK{本轮处理后 Redis 仍有剩余任务?}
AK -- 否 --> O
AK -- 是 --> AL[再次 apply_async 投递 dispatcher]
AL --> P简化时序图
sequenceDiagram
participant Client as 客户端
participant Router as Router
participant Service as handlerService
participant Redis as Redis逻辑栈
participant Celery as Celery Dispatcher
participant Job as 业务执行函数
participant DB as DB/日志/命令下发
Client->>Router: 提交 text/calculate 请求
Router->>Service: 调用 text_handler / calculate_handler
Service->>Redis: rpush 到 object 对应逻辑队列
Service->>Redis: setnx active 锁
alt 抢锁成功
Service->>Celery: apply_async(dispatch_object_handler_tasks)
else 抢锁失败
Service-->>Client: 返回 task_id,等待已有 dispatcher 消费
end
Celery->>Redis: lpop 一条 object 任务
Redis-->>Celery: 返回 payload
Celery->>Job: 调用 _run_text_handler_job / _run_calculate_handler_job
Job->>DB: 写 processing 记录
Job->>Job: calculate/敏感词检测
opt 命中敏感词
Job->>DB: write_log
Job->>DB: send_commend
end
Job->>DB: 写 success/failed 记录
Celery->>Redis: 检查是否还有剩余任务
alt 还有剩余
Celery->>Celery: 续投 dispatcher
else 队列已空
Celery->>Redis: 删除 active 锁
end核心理解
- 同一个
object:只会有一个 dispatcher 持有锁,所以严格串行。 - 不同
object:会进入各自 Redis 逻辑栈,由多个 Celery worker 并行推进。 - Celery 侧:只维护固定 dispatcher 队列
handler.dispatch,不会因为设备增多而爆出大量物理队列。


Comments | NOTHING