1.5 KiB
1.5 KiB
Message Queue — 实现细节
主流程见 SKILL.md,本文档为配置、Job、投递、事件驱动、监控的完整代码。
队列配置
// config/autoload/async_queue.php
return [
'default' => [
'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class,
'redis' => ['pool' => 'default'],
'channel' => env('QUEUE_CHANNEL', '{queue}'),
'timeout' => 2,
'retry_seconds' => [1, 5, 10, 30, 60],
'handle_timeout' => 60,
'processes' => 1,
'concurrent' => ['limit' => 10],
],
'notification' => [ /* 独立队列,更高优先级 */ ],
];
Job 类模板
class {{JobName}}Job extends Job
{
protected int $maxAttempts = 3;
public function __construct(protected readonly int $resourceId, protected readonly array $payload = []) {}
public function handle(): void {
$resource = $this->getResource();
if (!$resource || $this->isAlreadyProcessed($resource)) return;
try { $this->process($resource); }
catch (\Throwable $e) { logger()->error(...); throw $e; }
}
}
投递与事件驱动
QueueService:dispatch($job, $delay)、dispatchNotification($job)、dispatchDelayed($job, $delaySeconds)。事件驱动:Event → Listener → QueueService->dispatch。示例:OrderStatusChanged → 异步通知、同步外部、更新统计缓存(debounce 5s)。
监控命令
Redis {queue}:waiting、{queue}:delayed、{queue}:failed、{queue}:timeout。failed > 100 告警。