0%

queue:work 原理分析

最近在项目中使用了队列,因此研究一下相关的源码。本文只是粗略的进行分析,如果发现了错误欢迎大家讨论交流。

基本实现

php artisan queue:work 的代码实现是在Illuminate\Queue\Console\WorkCommand中。那么,让我们看一下它是怎样处理的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public function handle()
{
if ($this->downForMaintenance() && $this->option('once')) {
return $this->worker->sleep($this->option('sleep'));
}

$this->listenForEvents();

$connection = $this->argument('connection') ?: $this->laravel['config']['queue.default'];

$queue = $this->getQueue($connection);

$this->runWorker($connection, $queue);
}


protected function runWorker($connection, $queue)
{
// 设置缓存
$this->worker->setCache($this->laravel['cache']->driver());

return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
$connection, $queue, $this->gatherWorkerOptions()
);

上面的代码逻辑很简单,代码的核心逻辑在最后一行runWorker中,根据传参的不同,执行方法可能为 runNextJob 或者 daemon,由于本文的目的为研究原理,故只分析 daemon 方法。

daemon方法

daemon 方法存在于Illuminate\Queue\Worker中,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public function daemon($connectionName, $queue, WorkerOptions $options)
{
$this->listenForSignals();

$lastRestart = $this->getTimestampOfLastQueueRestart();

while(true) {
if(! $this->daemonShouldRun($option, $connectionName, $queue)) {
$this->pauseWorker($options, $lastRestart);

continue;
}


$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);

$this->registerTimeoutHandler($job, $options);

if ($job) {
$this->runJob($job, $connectionName, $options);
} else {
$this->sleep($options->sleep);
}

$this->stopIfNecessary($options, $lastRestart);
}
}

下面让我们一步一步来分析其中的代码逻辑。

监听信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected function listenForSignals()
{
if ($this->supportsAsyncSignals()) {
pcntl_async_signals(true);

pcntl_signal(SIGTERM, function () {
$this->shouldQuit = true;
});

pcntl_signal(SIGUSR2, function () {
$this->paused = true;
});

pcntl_signal(SIGCONT, function () {
$this->paused = false;
});
}
}

这段代码主要用到了 PHP 的 pcntl 扩展,具体的介绍和使用方法请看 官方文档 以及 这里

此扩展的主要用于进程控制,根据相关的进程信号值设置对应的参数。

获取上一次重启时间

1
2
3
4
5
6
   protected function getTimestampOfLastQueueRestart()                                                                                    
{
if ($this->cache) {
return $this->cache->get('illuminate:queue:restart');
}
}

当上一步之行完之后,应用会获取上一次的队列重启时间戳,即运行命令 php artisan queue:restart 时的时间戳。此时,如果缓存使用Redis的话,使用 redis-cli 登录后并输入 monitor 然后执行的话(monitor 命令用于实时监控 redis 的操作),会看到如下的显示,

1
2
3
4
5
6
7
8
9
1434697488.632958 [0 127.0.0.1:60136] "GET" "laravel:illuminate:queue:restart"
1434697491.634111 [0 127.0.0.1:60136] "GET" "laravel:illuminate:queue:restart"
1434697494.635239 [0 127.0.0.1:60136] "GET" "laravel:illuminate:queue:restart"
1434697497.636391 [0 127.0.0.1:60136] "GET" "laravel:illuminate:queue:restart"
1434697500.637753 [0 127.0.0.1:60136] "GET" "laravel:illuminate:queue:restart"
1434697503.639073 [0 127.0.0.1:60136] "GET" "laravel:illuminate:queue:restart"
1434697506.640155 [0 127.0.0.1:60136] "GET" "laravel:illuminate:queue:restart"
1434697509.641288 [0 127.0.0.1:60136] "GET" "laravel:illuminate:queue:restart"
1434697512.642365 [0 127.0.0.1:60136] "GET" "laravel:illuminate:queue:restart"

这表示我们正在不断的获取时间戳。

判断是否需要执行队列

接着在一个无限的循环中,我们判断是否可以执行队列,如果答案为否,那么我们将其进行休眠( PHP 中的 sleep 方法),如有必要也可以将对应的进程杀掉(stopIfNecessary 方法)。

执行队列

如果队列可以执行,那么代码将会执行对应的队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected function runJob($job, $connectionName, WorkerOptions $options)
{
try {
return $this->process($connectionName, $job, $options);
} catch (\Exception $e) {
$this->exceptions->report($e);

$this->stopWorkerIfLostConnection($e);
} catch (Throwable $e) {
$this->exceptions->report($e = new FatalThrowableError($e));

$this->stopWorkerIfLostConnection($e);
}
}


public function process($connectionName, $job, WorkerOptions $options)
{
$this->raiseBeforeJobEvent($connectionName, $job);

$this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
$connectionName, $job, (int) $options->maxTries
);

$job->fire();

$this->raiseAfterJobEvent($connectionName, $job);

// 省略了错误处理部分的代码
}

我们可以看到runJob的逻辑很简单,代码进行队列处理(process 方法),如果碰到错误或者异常将抛出对应的异常。在 process 方法中首先触发事件,然后判断是否达到最大尝试次数。如果没有,则开始队列处理,fire 方法会根据我们使用那种队列(Redis,Beanstalkd等)来进行对应的实现;

队列执行

1
2
3
4
5
6
7
8
9
10
// Illuminate\Queue\Jobs\Job;
public function fire()
{
$payload = $this->payload();

list($class, $method) = JobName::parse($payload['job]);

($this->instance = $this->resolve($class))->{$method}{$this, $payload['dada']};
}

在 这里 我们通过 payload来获取要执行的对象和方法,其中 payload 内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
[2019-05-16 11:02:04] local.INFO: array (
'displayName' => 'App\\Jobs\\TestJob',
'job' => 'Illuminate\\Queue\\CallQueuedHandler@call',
'maxTries' => NULL,
'timeout' => NULL,
'timeoutAt' => NULL,
'data' =>
array (
'commandName' => 'App\\Jobs\\TestJob',
'command' => 'O:16:"App\\Jobs\\TestJob":7:{s:6:"' . "\0" . '*' . "\0" . 'job";N;s:10:"connection";N;s:5:"queue";N;s:15:"chainConnection";N;s:10:"chainQueue";N;s:5:"delay";N;s:7:"chained";a:0:{}}',
),
)

因此,fire 最后会使用 Illuminate\Queue\CallQueueHandler类中的call方法。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public function call(Job $job, array $data)
{
try {
$command = $this->setJobInstanceIfNecessary(
$job, unserialize($data['command'])
);
} catch (ModelNotFoundException $e) {
return $this->handleModelNotFound($job, $e);
}

$this->dispatcher->dispatchNow(
$command, $this->resolveHandler($job, $command)
);

// 其余错误处理代码省略
}

在这里我们首先设定好 Job 实例,然后处理对应的命令和错误,至此,queue:work 的基本原理已经分析完毕。

欢迎关注我的其它发布渠道