添加数据到队列

使用 dispatch 方法添加任务到队列

dispatch(new UserConvertUploadJob([
    'convert_logs_id' => 1,
    'upload_reason_type' => 2,
]));

然后实例化 PendingDispatch 类

new PendingDispatch($job)

class PendingDispatch
{

    public function __construct($job)
    {
        $this->job = $job;
    }


    public function __destruct()
    {
        if (! $this->shouldDispatch()) {
            return;
        } elseif ($this->afterResponse) {
            app(Dispatcher::class)->dispatchAfterResponse($this->job);
        } else {
            app(Dispatcher::class)->dispatch($this->job);
        }
    }
}

然后执行 Dispatcher 的dispatch 触发事件

class Dispatcher implements QueueingDispatcher
{
    public function dispatch($command)
    {
        return $this->queueResolver && $this->commandShouldBeQueued($command)
                        ? $this->dispatchToQueue($command) // 因为job instanceof ShouldQueue会执行这个
                        : $this->dispatchNow($command);
    }

    public function dispatchToQueue($command)
    {
        $connection = $command->connection ?? null;

        // 获取horizon的redisQueue连接
        $queue = call_user_func($this->queueResolver, $connection);

        if (! $queue instanceof Queue) {
            throw new RuntimeException('Queue resolver did not return a Queue implementation.');
        }

        if (method_exists($command, 'queue')) {
            return $command->queue($queue, $command);
        }

        return $this->pushCommandToQueue($queue, $command);
    }

    protected function pushCommandToQueue($queue, $command)
    {
        if (isset($command->queue, $command->delay)) {
            return $queue->laterOn($command->queue, $command->delay, $command);
        }

        // 一般执行这个,因为没有delay
        if (isset($command->queue)) {
            return $queue->pushOn($command->queue, $command);
        }

        if (isset($command->delay)) {
            return $queue->later($command->delay, $command);
        }

        return $queue->push($command);
    }
}

然后执行 RedisQueue 的push方法

namespace Laravel\Horizon;
use Illuminate\Queue\RedisQueue as BaseQueue;

class RedisQueue extends BaseQueue
{
    public function push($job, $data = '', $queue = null)
    {
        return $this->enqueueUsing(
            $job,
            $this->createPayload($job, $this->getQueue($queue), $data),
            $queue,
            null,
            function ($payload, $queue) use ($job) {
                $this->lastPushed = $job;

                // 这里执行,往队列和horizon redis里添加值
                return $this->pushRaw($payload, $queue);
            }
        );
    }
}

最终创建在redis里新增如下的key

lxpzafr2.png

lxpzafr2.png

lxpzapeq.png

lxpzapeq.png

最后修改:2024 年 06 月 22 日
如果觉得我的文章对你有用,请随意赞赏