Queues

Introduction

在构建 Web 应用程序时,您可能需要执行一些任务,例如解析和存储上传的 CSV 文件,这些任务在典型的 Web 请求期间需要很长时间才能执行。值得庆幸的是,Laravel 允许您轻松创建可以在后台处理的队列作业。通过将时间密集型任务移到队列中,您的应用程序可以以极快的速度响应 Web 请求,并为您的客户提供更好的用户体验。

Laravel 队列为各种不同的队列后端提供统一的队列 API,例如亚马逊SQS,Redis,甚至是关系数据库。

Laravel 的队列配置选项存储在你的应用程序的config/queue.php 配置文件。在此文件中,您将找到框架中包含的每个队列驱动程序的连接配置,包括数据库、亚马逊SQS,Redis, 和Beanstalkd 驱动程序,以及将立即执行作业的同步驱动程序(在本地开发期间使用)。 Anull 还包括队列驱动程序,它会丢弃排队的作业。

Note
Laravel 现在提供 Horizo​​n,一个漂亮的仪表板和配置系统,适用于 Redis 驱动的队列。查看完整的地平线文档 了解更多信息。

连接比。队列

在开始使用 Laravel 队列之前,了解“连接”和“队列”之间的区别很重要。在你的config/queue.php 配置文件里面有个connections 配置数组。此选项定义与后端队列服务(如 Amazon SQS、Beanstalk 或 Redis)的连接。然而,任何给定的队列连接都可能有多个“队列”,它们可以被认为是不同的堆栈或成堆的排队作业。

请注意,每个连接配置示例中queue 配置文件包含一个queue 属性。这是作业发送到给定连接时将被分派到的默认队列。换句话说,如果您在没有明确定义应将其分派到哪个队列的情况下分派作业,则该作业将被放置在queue连接配置的属性:

use App\Jobs\ProcessPodcast;

// This job is sent to the default connection's default queue...
ProcessPodcast::dispatch();

// This job is sent to the default connection's "emails" queue...
ProcessPodcast::dispatch()->onQueue('emails');

一些应用程序可能不需要将作业推送到多个队列,而是更喜欢有一个简单的队列。然而,将作业推送到多个队列对于希望优先处理或分段作业处理方式的应用程序特别有用,因为 Laravel 队列工作者允许您指定它应该按优先级处理哪些队列。例如,如果您将作业推送到high 队列,你可以运行一个给他们更高处理优先级的工作人员:

php artisan queue:work --queue=high,default

驱动程序说明和先决条件

Database

为了使用database 队列驱动程序,您将需要一个数据库表来保存作业。要生成创建此表的迁移,请运行queue:table 工匠命令。创建迁移后,您可以使用migrate 命令:

php artisan queue:table

php artisan migrate

最后,不要忘记指示您的应用程序使用database 通过更新驱动程序QUEUE_CONNECTION 应用程序中的变量.env 文件:

QUEUE_CONNECTION=database

Redis

为了使用redis 队列驱动程序,你应该在你的config/database.php 配置文件。

Redis集群

如果您的 Redis 队列连接使用 Redis 集群,您的队列名称必须包含关键散列标签.这是为了确保给定队列的所有 Redis 键都放在同一个哈希槽中:

'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => '{default}',
    'retry_after' => 90,
],

Blocking

使用 Redis 队列时,您可以使用block_for 配置选项指定驱动程序在遍历工作循环并重新轮询 Redis 数据库之前应等待作业可用的时间。

根据您的队列负载调整此值可能比不断轮询 Redis 数据库以查找新作业更有效。例如,您可以将值设置为5 指示驱动程序在等待作业可用时应阻塞五秒钟:

'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => 'default',
    'retry_after' => 90,
    'block_for' => 5,
],

Warning
环境block_for0 将导致队列工作人员无限期阻塞,直到有工作可用。这也将阻止诸如SIGTERM 从被处理到下一个作业被处理。

其他驱动程序先决条件

列出的队列驱动程序需要以下依赖项。这些依赖项可以通过 Composer 包管理器安装:

  • 亚马逊 SQS:aws/aws-sdk-php ~3.0
  • 豆茎:pda/pheanstalk ~4.0
  • 雷迪斯:predis/predis ~1.0 或者 phpredis PHP 扩展

创造工作

生成作业类

默认情况下,应用程序的所有可排队作业都存储在app/Jobs 目录。如果app/Jobs 目录不存在,将在运行时创建make:job 工匠命令:

php artisan make:job ProcessPodcast

生成的类将实现Illuminate\Contracts\Queue\ShouldQueue 接口,向 Laravel 指示作业应该被推送到队列中以异步运行。

Note
可以使用自定义作业存根存根发布.

类结构

作业类非常简单,通常只包含一个handle 队列处理作业时调用的方法。首先,让我们看一个示例作业类。在这个例子中,我们假设我们管理一个播客发布服务并且需要在发布之前处理上传的播客文件:

<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     */
    public function __construct(
        public Podcast $podcast,
    ) {}

    /**
     * Execute the job.
     */
    public function handle(AudioProcessor $processor): void
    {
        // Process uploaded podcast...
    }
}

在此示例中,请注意我们能够传递一个口才模型 直接进入排队作业的构造函数。因为SerializesModels 作业正在使用的特征,Eloquent 模型及其加载的关系将在作业处理时优雅地序列化和反序列化。

如果你的排队作业在其构造函数中接受 Eloquent 模型,则只有模型的标识符将被序列化到队列中。当实际处理作业时,队列系统将自动从数据库中重新检索完整的模型实例及其加载的关系。这种模型序列化方法允许将更小的作业有效负载发送到您的队列驱动程序。

handle 方法依赖注入

handle 当队列处理作业时调用方法。请注意,我们能够对handle 作业的方法。 Laravel服务容器 自动注入这些依赖项。

如果您想完全控制容器如何将依赖项注入到handle 方法,你可以使用容器的bindMethod方法。这bindMethod 方法接受接收作业和容器的回调。在回调中,您可以自由调用handle 随心所欲的方法。通常,您应该从boot 你的方法App\Providers\AppServiceProvider 服务提供者:

use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;

$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
    return $job->handle($app->make(AudioProcessor::class));
});

Warning
二进制数据,例如原始图像内容,应该通过base64_encode 在传递给排队作业之前的函数。否则,作业在放入队列时可能无法正确序列化为 JSON。

队列关系

因为加载的关系也被序列化,所以序列化的作业字符串有时会变得非常大。为防止关系被序列化,您可以调用withoutRelations 设置属性值时模型上的方法。此方法将返回没有加载关系的模型实例:

/**
 * Create a new job instance.
 */
public function __construct(Podcast $podcast)
{
    $this->podcast = $podcast->withoutRelations();
}

此外,当反序列化作业并从数据库中重新检索模型关系时,它们将被完整检索。在作业排队过程中序列化模型之前应用的任何先前关系约束将不会在作业反序列化时应用。因此,如果您希望处理给定关系的子集,您应该在队列作业中重新限制该关系。

独特的工作

Warning
独特的作业需要支持的缓存驱动程序locks.目前,memcached,redis,dynamodb,database,file, 和array 缓存驱动程序支持原子锁。此外,唯一作业约束不适用于批次内的作业。

有时,您可能希望确保在任何时间点队列中只有一个特定作业的实例。您可以通过实施ShouldBeUnique 在你的工作类上的界面。这个接口不需要你在你的类上定义任何额外的方法:

<?php

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    ...
}

在上面的例子中,UpdateSearchIndex 工作是独一无二的。因此,如果作业的另一个实例已经在队列中并且尚未完成处理,则不会分派该作业。

在某些情况下,您可能想要定义一个特定的“键”来使作业唯一,或者您可能想要指定一个超时时间,超过该时间作业将不再保持唯一。为此,您可以定义uniqueIduniqueFor 作业类的属性或方法:

<?php

use App\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    /**
     * The product instance.
     *
     * @var \App\Product
     */
    public $product;

    /**
     * The number of seconds after which the job's unique lock will be released.
     *
     * @var int
     */
    public $uniqueFor = 3600;

    /**
     * The unique ID of the job.
     */
    public function uniqueId(): string
    {
        return $this->product->id;
    }
}

在上面的例子中,UpdateSearchIndex 作业通过产品 ID 是唯一的。因此,在现有作业完成处理之前,将忽略具有相同产品 ID 的作业的任何新分派。此外,如果现有作业在一小时内未被处理,则唯一锁将被释放,并且可以将具有相同唯一键的另一个作业调度到队列中。

Warning
如果你的应用程序从多个 web 服务器或容器调度作业,你应该确保你的所有服务器都与同一个中央缓存服务器通信,以便 Laravel 可以准确地确定一个作业是否是唯一的。

在处理开始之前保持作业的唯一性

默认情况下,唯一作业在作业完成处理或所有重试尝试失败后“解锁”。但是,在某些情况下,您可能希望您的作业在处理之前立即解锁。为此,您的工作应该实施ShouldBeUniqueUntilProcessing 合同而不是ShouldBeUnique 合同:

<?php

use App\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
    // ...
}

独特的工作锁

在幕后,当一个ShouldBeUnique 作业被调度,Laravel 尝试获取一个lockuniqueId 钥匙。如果未获取锁,则不会分派作业。当作业完成处理或所有重试尝试失败时,将释放此锁。默认情况下,Laravel 会使用默认的缓存驱动来获取这个锁。但是,如果您希望使用另一个驱动程序来获取锁,您可以定义一个uniqueVia 返回应该使用的缓存驱动程序的方法:

use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    ...

    /**
     * Get the cache driver for the unique job lock.
     */
    public function uniqueVia(): Repository
    {
        return Cache::driver('redis');
    }
}

Note
如果您只需要限制作业的并发处理,请使用WithoutOverlapping 作业中间件代替。

作业中间件

作业中间件允许您围绕排队作业的执行包装自定义逻辑,从而减少作业本身的样板文件。例如,考虑以下handle 方法利用 Laravel 的 Redis 速率限制功能允许每五秒只处理一个作业:

use Illuminate\Support\Facades\Redis;

/**
 * Execute the job.
 */
public function handle(): void
{
    Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
        info('Lock obtained...');

        // Handle job...
    }, function () {
        // Could not obtain lock...

        return $this->release(5);
    });
}

虽然此代码有效,但handle方法变得嘈杂,因为它与 Redis 速率限制逻辑杂乱无章。此外,对于我们想要限制速率的任何其他作业,必须复制此速率限制逻辑。

我们可以定义一个作业中间件来处理速率限制,而不是在 handle 方法中进行速率限制。 Laravel 没有作业中间件的默认位置,因此欢迎您将作业中间件放置在应用程序的任何位置。在这个例子中,我们将把中间件放在一个app/Jobs/Middleware 目录:

<?php

namespace App\Jobs\Middleware;

use Closure;
use Illuminate\Support\Facades\Redis;

class RateLimited
{
    /**
     * Process the queued job.
     *
     * @param  \Closure(object): void  $next
     */
    public function handle(object $job, Closure $next): void
    {
        Redis::throttle('key')
                ->block(0)->allow(1)->every(5)
                ->then(function () use (object $job, Closure $next) {
                    // Lock obtained...

                    $next($job);
                }, function () use ($job) {
                    // Could not obtain lock...

                    $job->release(5);
                });
    }
}

如你所见,就像路由中间件,作业中间件接收正在处理的作业和应调用以继续处理作业的回调。

创建作业中间件后,可以通过从作业的返回它们来将它们附加到作业middleware 方法。此方法不存在于由make:job Artisan 命令,因此您需要手动将其添加到您的作业类中:

use App\Jobs\Middleware\RateLimited;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new RateLimited];
}

Note
作业中间件也可以分配给可排队的事件侦听器、邮件和通知。

速率限制

虽然我们刚刚演示了如何编写您自己的限速作业中间件,但 Laravel 实际上包含一个限速中间件,您可以使用它来限速作业。喜欢路由速率限制器,作业速率限制器使用RateLimiter 门面的for 方法。

例如,您可能希望允许用户每小时备份一次数据,同时对高级客户不施加此类限制。为此,您可以定义一个RateLimiter 在里面boot 你的方法AppServiceProvider:

use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;

/**
 * Bootstrap any application services.
 */
public function boot(): void
{
    RateLimiter::for('backups', function (object $job) {
        return $job->user->vipCustomer()
                    ? Limit::none()
                    : Limit::perHour(1)->by($job->user->id);
    });
}

在上面的示例中,我们定义了一个小时速率限制;但是,您可以使用perMinute 方法。此外,您可以将任何您希望的值传递给by 速率限制的方法;但是,此值最常用于按客户划分速率限制:

return Limit::perMinute(50)->by($job->user->id);

定义速率限制后,您可以使用以下方法将速率限制器附加到备份作业Illuminate\Queue\Middleware\RateLimited 中间件。每次作业超过速率限制时,此中间件都会根据速率限制持续时间以适当的延迟将作业释放回队列。

use Illuminate\Queue\Middleware\RateLimited;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new RateLimited('backups')];
}

将速率受限的作业释放回队列仍会增加作业的总数attempts.您可能希望调整您的triesmaxExceptions 相应地在您的工作类别上的属性。或者,您可能希望使用retryUntil 方法 定义不再尝试作业之前的时间量。

如果您不想在速率受限时重试作业,您可以使用dontRelease 方法:

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new RateLimited('backups'))->dontRelease()];
}

Note
如果你正在使用 Redis,你可以使用Illuminate\Queue\Middleware\RateLimitedWithRedis 中间件,针对 Redis 进行了微调,比基本限速中间件更高效。

防止工作重叠

Laravel 包括一个Illuminate\Queue\Middleware\WithoutOverlapping 允许您根据任意键防止作业重叠的中间件。当排队的作业正在修改一次只能由一个作业修改的资源时,这会很有帮助。

例如,假设您有一个更新用户信用评分的排队作业,并且您希望防止同一用户 ID 的信用评分更新作业重叠。为此,您可以返回WithoutOverlapping 你工作的中间件middleware 方法:

use Illuminate\Queue\Middleware\WithoutOverlapping;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new WithoutOverlapping($this->user->id)];
}

任何相同类型的重叠作业将被释放回队列。您还可以指定再次尝试释放的作业之前必须经过的秒数:

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}

如果你想立即删除任何重叠的作业,这样它们就不会被重试,你可以使用dontRelease 方法:

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}

WithoutOverlapping 中间件由 Laravel 的原子锁功能提供支持。有时,您的作业可能会意外失败或超时,导致锁未释放。因此,您可以使用expireAfter 方法。例如,下面的示例将指示 Laravel 释放WithoutOverlapping 作业开始处理三分钟后锁定:

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}

WarningWithoutOverlapping 中间件需要一个支持的缓存驱动程序locks.目前,memcached,redis,dynamodb,database,file, 和array 缓存驱动程序支持原子锁。

跨作业类别共享锁定密钥

默认情况下,WithoutOverlapping 中间件只会防止同一类的重叠作业。因此,尽管两个不同的作业类可能使用相同的锁定密钥,但不会阻止它们重叠。但是,您可以指示 Laravel 使用跨作业类应用密钥shared方法:

use Illuminate\Queue\Middleware\WithoutOverlapping;

class ProviderIsDown
{
    // ...


    public function middleware(): array
    {
        return [
            (new WithoutOverlapping("status:{$this->provider}"))->shared(),
        ];
    }
}

class ProviderIsUp
{
    // ...


    public function middleware(): array
    {
        return [
            (new WithoutOverlapping("status:{$this->provider}"))->shared(),
        ];
    }
}

节流异常

Laravel 包括一个Illuminate\Queue\Middleware\ThrottlesExceptions 允许您限制异常的中间件。一旦作业抛出给定数量的异常,所有进一步执行作业的尝试都会延迟,直到指定的时间间隔过去。此中间件对于与不稳定的第三方服务交互的作业特别有用。

例如,让我们想象一个与开始抛出异常的第三方 API 交互的排队作业。要限制异常,您可以返回ThrottlesExceptions 你工作的中间件middleware 方法。通常,此中间件应与实现的作业配对基于时间的尝试:

use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new ThrottlesExceptions(10, 5)];
}

/**
 * Determine the time at which the job should timeout.
 */
public function retryUntil(): DateTime
{
    return now()->addMinutes(5);
}

中间件接受的第一个构造函数参数是作业在被限制之前可以抛出的异常数,而第二个构造函数参数是作业被限制后再次尝试之前应该经过的分钟数。在上面的代码示例中,如果作业在 5 分钟内抛出 10 个异常,我们将等待 5 分钟,然后再次尝试作业。

当作业抛出异常但尚未达到异常阈值时,通常会立即重试该作业。但是,您可以通过调用backoff 将中间件附加到作业时的方法:

use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 5))->backoff(5)];
}

在内部,这个中间件使用 Laravel 的缓存系统来实现速率限制,作业的类名被用作缓存“键”。您可以通过调用by 将中间件附加到您的工作时的方法。如果您有多个作业与同一个第三方服务交互并且您希望它们共享一个通用的节流“桶”,这可能很有用:

use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10))->by('key')];
}

Note
如果你正在使用 Redis,你可以使用Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis 中间件,针对 Redis 进行了微调,比基本的异常节流中间件更高效。

调度作业

一旦你写好了你的工作类,你可以使用dispatch 工作本身的方法。传递给的参数dispatch 方法将被提供给作业的构造函数:

<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // ...

        ProcessPodcast::dispatch($podcast);

        return redirect('/podcasts');
    }
}

如果您想有条件地派遣工作,您可以使用dispatchIfdispatchUnless 方法:

ProcessPodcast::dispatchIf($accountActive, $podcast);

ProcessPodcast::dispatchUnless($accountSuspended, $podcast);

在新的 Laravel 应用程序中,sync 驱动程序是默认的队列驱动程序。该驱动程序在当前请求的前台同步执行作业,这在本地开发时通常很方便。如果你想真正开始排队作业进行后台处理,你可以在你的应用程序中指定一个不同的队列驱动程序config/queue.php 配置文件。

延迟调度

如果你想指定一个作业不应该立即可供队列工作者处理,你可以使用delay 调度作业时的方法。例如,让我们指定作业在分派后 10 分钟后才可用于处理:

<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // ...

        ProcessPodcast::dispatch($podcast)
                    ->delay(now()->addMinutes(10));

        return redirect('/podcasts');
    }
}

Warning
Amazon SQS 队列服务的最长延迟时间为 15 分钟。

响应发送到浏览器后调度

或者,dispatchAfterResponse 如果您的 Web 服务器正在使用 FastCGI,方法会延迟调度作业,直到将 HTTP 响应发送到用户的浏览器之后。即使排队的作业仍在执行,这仍将允许用户开始使用该应用程序。这通常应该只用于需要大约一秒钟的工作,例如发送电子邮件。由于它们是在当前 HTTP 请求中处理的,因此以这种方式分派的作业不需要运行队列工作程序来处理它们:

use App\Jobs\SendNotification;

SendNotification::dispatchAfterResponse();

你也可以dispatch 关闭并链接afterResponse 方法到dispatch 在 HTTP 响应发送到浏览器后执行闭包的助手:

use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;

dispatch(function () {
    Mail::to('taylor@example.com')->send(new WelcomeMessage);
})->afterResponse();

同步调度

如果你想立即(同步)调度一个工作,你可以使用dispatchSync 方法。使用此方法时,作业不会排队,会在当前进程内立即执行:

<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // Create podcast...

        ProcessPodcast::dispatchSync($podcast);

        return redirect('/podcasts');
    }
}

作业和数据库事务

虽然在数据库事务中分派作业非常好,但您应该特别小心以确保您的作业实际上能够成功执行。在事务中调度作业时,作业可能会在父事务提交之前由 worker 处理。发生这种情况时,您在数据库事务期间对模型或数据库记录所做的任何更新可能尚未反映在数据库中。此外,在事务中创建的任何模型或数据库记录可能不存在于数据库中。

值得庆幸的是,Laravel 提供了几种解决这个问题的方法。首先,您可以设置after_commit 队列连接配置数组中的连接选项:

'redis' => [
    'driver' => 'redis',
    // ...
    'after_commit' => true,
],

当。。。的时候after_commit 选项是true,您可以在数据库事务中调度作业;然而,Laravel 会等到打开的父数据库事务在实际调度作业之前被提交。当然,如果当前没有打开的数据库事务,作业将被立即调度。

如果一个事务由于在事务过程中发生异常而被回滚,那么在该事务过程中调度的作业将被丢弃。

Note
设定after_commit 配置选项true 还将导致在提交所有打开的数据库事务后分派任何排队的事件侦听器、邮件、通知和广播事件。

指定内联提交调度行为

如果您不设置after_commit 队列连接配置选项true,您仍然可以指示在提交所有打开的数据库事务后应分派特定作业。为此,您可以将afterCommit 方法到您的调度操作:

use App\Jobs\ProcessPodcast;

ProcessPodcast::dispatch($podcast)->afterCommit();

同样,如果after_commit 配置选项设置为true,您可以指示应立即分派特定作业,而无需等待任何打开的数据库事务提交:

ProcessPodcast::dispatch($podcast)->beforeCommit();

作业链

作业链允许您指定在主作业成功执行后应按顺序运行的排队作业列表。如果序列中的一个作业失败,则其余作业将不会运行。要执行排队的作业链,您可以使用chain 提供的方法Bus 正面。 Laravel 的命令总线是一个较低级别的组件,队列作业调度建立在以下基础之上:

use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->dispatch();

除了链接作业类实例之外,您还可以链接闭包:

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    function () {
        Podcast::update(/* ... */);
    },
])->dispatch();

Warning
使用删除作业$this->delete() 作业中的方法不会阻止处理链式作业。只有当链中的作业失败时,链才会停止执行。

链连接和队列

如果您想指定应该用于链接作业的连接和队列,您可以使用onConnectiononQueue 方法。这些方法指定应该使用的队列连接和队列名称,除非排队的作业被显式分配了不同的连接/队列:

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();

链故障

链接作业时,您可以使用catch 方法来指定在链中的作业失败时应调用的闭包。给定的回调将收到Throwable 导致作业失败的实例:

use Illuminate\Support\Facades\Bus;
use Throwable;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->catch(function (Throwable $e) {
    // A job within the chain has failed...
})->dispatch();

Warning
由于链回调被 Laravel 队列稍后序列化和执行,你不应该使用$this 链回调中的变量。

自定义队列和连接

调度到特定队列

通过将作业推送到不同的队列,您可以对排队的作业进行“分类”,甚至可以优先分配给不同队列的工作人员数量。请记住,这不会将作业推送到队列配置文件定义的不同队列“连接”,而只会推送到单个连接中的特定队列。要指定队列,请使用onQueue 调度作业时的方法:

<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // Create podcast...

        ProcessPodcast::dispatch($podcast)->onQueue('processing');

        return redirect('/podcasts');
    }
}

或者,您可以通过调用onQueue 作业构造函数中的方法:

<?php

namespace App\Jobs;

 use Illuminate\Bus\Queueable;
 use Illuminate\Contracts\Queue\ShouldQueue;
 use Illuminate\Foundation\Bus\Dispatchable;
 use Illuminate\Queue\InteractsWithQueue;
 use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     */
    public function __construct()
    {
        $this->onQueue('processing');
    }
}

调度到特定连接

如果您的应用程序与多个队列连接交互,您可以使用onConnection 方法:

<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // Create podcast...

        ProcessPodcast::dispatch($podcast)->onConnection('sqs');

        return redirect('/podcasts');
    }
}

你可以链接onConnectiononQueue 方法一起指定作业的连接和队列:

ProcessPodcast::dispatch($podcast)
              ->onConnection('sqs')
              ->onQueue('processing');

或者,您可以通过调用onConnection 作业构造函数中的方法:

<?php

namespace App\Jobs;

 use Illuminate\Bus\Queueable;
 use Illuminate\Contracts\Queue\ShouldQueue;
 use Illuminate\Foundation\Bus\Dispatchable;
 use Illuminate\Queue\InteractsWithQueue;
 use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     */
    public function __construct()
    {
        $this->onConnection('sqs');
    }
}

指定最大作业尝试次数/超时值

最大尝试次数

如果您排队的作业之一遇到错误,您可能不希望它无限期地重试。因此,Laravel 提供了多种方式来指定可以尝试作业的次数或持续时间。

指定作业可能尝试的最大次数的一种方法是通过--tries 打开 Artisan 命令行。这将适用于 worker 处理的所有作业,除非正在处理的作业指定可能尝试的次数:

php artisan queue:work --tries=3

如果作业超过其最大尝试次数,它将被视为“失败”作业。有关处理失败作业的更多信息,请参阅失败的工作文件.如果--tries=0 提供给queue:work 命令,作业将无限期地重试。

您可以通过定义可以在作业类本身上尝试作业的最大次数来采用更细粒度的方法。如果在作业上指定了最大尝试次数,它将优先于--tries 命令行上提供的值:

<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * The number of times the job may be attempted.
     *
     * @var int
     */
    public $tries = 5;
}

基于时间的尝试

作为定义作业在失败之前可以尝试多少次的替代方法,您可以定义一个不应再尝试作业的时间。这允许在给定时间范围内尝试任意次数的作业。要定义不应再尝试作业的时间,请添加retryUntil 方法到你的工作类。这个方法应该返回一个DateTime 实例:

use DateTime;

/**
 * Determine the time at which the job should timeout.
 */
public function retryUntil(): DateTime
{
    return now()->addMinutes(10);
}

Note
你也可以定义一个tries 财产或retryUntil 你的方法排队的事件监听器.

最大异常

有时您可能希望指定一个作业可以尝试多次,但如果重试是由给定数量的未处理异常触发的(而不是由release 直接方法)。为此,您可以定义一个maxExceptions 您的工作类别的财产:

<?php

namespace App\Jobs;

use Illuminate\Support\Facades\Redis;

class ProcessPodcast implements ShouldQueue
{
    /**
     * The number of times the job may be attempted.
     *
     * @var int
     */
    public $tries = 25;

    /**
     * The maximum number of unhandled exceptions to allow before failing.
     *
     * @var int
     */
    public $maxExceptions = 3;

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        Redis::throttle('key')->allow(10)->every(60)->then(function () {
            // Lock obtained, process the podcast...
        }, function () {
            // Unable to obtain lock...
            return $this->release(10);
        });
    }
}

在这个例子中,如果应用程序无法获得 Redis 锁,作业将被释放 10 秒,并且将继续重试最多 25 次。但是,如果作业抛出三个未处理的异常,作业将失败。

Timeout

Warning
pcntl 必须安装 PHP 扩展才能指定作业超时。

通常,您大致知道您希望排队的作业需要多长时间。出于这个原因,Laravel 允许您指定一个“超时”值。默认情况下,超时值为 60 秒。如果作业的处理时间超过超时值指定的秒数,则处理该作业的工作人员将退出并出错。通常,worker 将由一个自动重启服务器上配置的进程管理器.

作业可以运行的最大秒数可以使用--timeout 打开 Artisan 命令行:

php artisan queue:work --timeout=30

如果作业因持续超时而超过其最大尝试次数,它将被标记为失败。

您还可以定义允许作业在作业类本身上运行的最大秒数。如果在作业上指定了超时,它将优先于命令行上指定的任何超时:

<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * The number of seconds the job can run before timing out.
     *
     * @var int
     */
    public $timeout = 120;
}

有时,IO 阻塞进程(如套接字或传出 HTTP 连接)可能不遵守您指定的超时时间。因此,在使用这些功能时,您也应该始终尝试使用它们的 API 指定超时。例如,在使用 Guzzle 时,您应该始终指定连接和请求超时值。

超时失败

如果你想表明一个工作应该被标记为failed 超时时,您可以定义$failOnTimeout 作业类的属性:

/**
 * Indicate if the job should be marked as failed on timeout.
 *
 * @var bool
 */
public $failOnTimeout = true;

错误处理

如果在处理作业时抛出异常,作业将自动释放回队列,以便再次尝试。该作业将继续发布,直到尝试达到您的应用程序允许的最大次数为止。最大尝试次数由--tries 上使用的开关queue:work 工匠命令。或者,可以在作业类本身上定义最大尝试次数。有关运行队列工作程序的更多信息可以在下面找到.

手动释放作业

有时您可能希望手动将作业释放回队列,以便稍后再次尝试。您可以通过调用release 方法:

/**
 * Execute the job.
 */
public function handle(): void
{
    // ...

    $this->release();
}

默认情况下,release 方法会将作业释放回队列以便立即处理。但是,通过将整数传递给release 方法,您可以指示队列在给定的秒数过去之前不要让作业可供处理:

$this->release(10);

手动失败作业

有时您可能需要手动将作业标记为“失败”。为此,您可以致电fail方法:

/**
 * Execute the job.
 */
public function handle(): void
{
    // ...

    $this->fail();
}

如果你想将你的工作标记为失败,因为你已经捕获了一个异常,你可以将异常传递给fail 方法。或者,为方便起见,您可以传递一个字符串错误消息,该消息将为您转换为异常:

$this->fail($exception);

$this->fail('Something went wrong.');

Note
有关失败作业的更多信息,请查看处理工作失败的文档.

作业批处理

Laravel 的作业批处理功能允许你轻松地执行一批作业,然后在这批作业执行完毕后执行一些操作。在开始之前,您应该创建一个数据库迁移以构建一个表来包含有关您的作业批次的元信息,例如它们的完成百分比。这种迁移可以使用queue:batches-table 工匠命令:

php artisan queue:batches-table

php artisan migrate

定义可批处理作业

要定义可批处理的作业,您应该创建一个可排队的工作 像平常一样;但是,您应该添加Illuminate\Bus\Batchable 工作类别的特征。这个特性提供了访问batch 可用于检索作业正在执行的当前批处理的方法:

<?php

namespace App\Jobs;

use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ImportCsv implements ShouldQueue
{
    use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        if ($this->batch()->cancelled()) {
            // Determine if the batch has been cancelled...

            return;
        }

        // Import a portion of the CSV file...
    }
}

分派批次

要分派一批作业,您应该使用batch 的方法Bus 正面。当然,批处理主要在与完成回调结合使用时有用。所以,你可以使用then,catch, 和finally 定义批处理完成回调的方法。这些回调中的每一个都会收到一个Illuminate\Bus\Batch 调用它们时的实例。在这个例子中,我们假设我们正在排队一批作业,每个作业处理 CSV 文件中给定数量的行:

use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;

$batch = Bus::batch([
    new ImportCsv(1, 100),
    new ImportCsv(101, 200),
    new ImportCsv(201, 300),
    new ImportCsv(301, 400),
    new ImportCsv(401, 500),
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->catch(function (Batch $batch, Throwable $e) {
    // First batch job failure detected...
})->finally(function (Batch $batch) {
    // The batch has finished executing...
})->dispatch();

return $batch->id;

批次的 ID,可以通过访问$batch->id 财产,可用于查询 Laravel 命令总线 有关批次发货后的信息。

Warning
由于批处理回调由 Laravel 队列稍后序列化和执行,你不应该使用$this 回调中的变量。

命名批次

一些工具,如 Laravel Horizo​​n 和 Laravel Telescope,如果批次被命名,可能会为批次提供更加用户友好的调试信息。要为批次分配任意名称,您可以调用name 定义批次时的方法:

$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->name('Import CSV')->dispatch();

批量连接和队列

如果您想指定应用于批处理作业的连接和队列,您可以使用onConnectiononQueue 方法。所有批处理作业必须在相同的连接和队列中执行:

$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->onConnection('redis')->onQueue('imports')->dispatch();

批次内链

你可以定义一组连锁工作 通过将链接的作业放在一个数组中来在一个批次中。例如,我们可以并行执行两个作业链,并在两个作业链都完成处理后执行回调:

use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;

Bus::batch([
    [
        new ReleasePodcast(1),
        new SendPodcastReleaseNotification(1),
    ],
    [
        new ReleasePodcast(2),
        new SendPodcastReleaseNotification(2),
    ],
])->then(function (Batch $batch) {
    // ...
})->dispatch();

将作业添加到批次

有时从批处理作业中向批处理添加其他作业可能很有用。当您需要对数千个作业进行批处理,而这些作业在 Web 请求期间可能需要很长时间才能分派时,此模式非常有用。因此,相反,您可能希望分派初始批次的“加载程序”作业,以使用更多作业来补充批次:

$batch = Bus::batch([
    new LoadImportBatch,
    new LoadImportBatch,
    new LoadImportBatch,
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->name('Import Contacts')->dispatch();

在这个例子中,我们将使用LoadImportBatch job 用额外的作业来补充批次。为此,我们可以使用add 可以通过作业的访问的批处理实例上的方法batch 方法:

use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;

/**
 * Execute the job.
 */
public function handle(): void
{
    if ($this->batch()->cancelled()) {
        return;
    }

    $this->batch()->add(Collection::times(1000, function () {
        return new ImportContacts;
    }));
}

Warning
您只能从属于同一批次的作业中将作业添加到批次中。

检查批次

Illuminate\Bus\Batch 提供给批处理完成回调的实例具有多种属性和方法,可帮助您与给定的批处理作业进行交互和检查:

// The UUID of the batch...
$batch->id;

// The name of the batch (if applicable)...
$batch->name;

// The number of jobs assigned to the batch...
$batch->totalJobs;

// The number of jobs that have not been processed by the queue...
$batch->pendingJobs;

// The number of jobs that have failed...
$batch->failedJobs;

// The number of jobs that have been processed thus far...
$batch->processedJobs();

// The completion percentage of the batch (0-100)...
$batch->progress();

// Indicates if the batch has finished executing...
$batch->finished();

// Cancel the execution of the batch...
$batch->cancel();

// Indicates if the batch has been cancelled...
$batch->cancelled();

从路线返回批次

全部Illuminate\Bus\Batch 实例是 JSON 可序列化的,这意味着您可以直接从应用程序的路由之一返回它们以检索包含有关批次的信息的 JSON 有效负载,包括其完成进度。这使得在应用程序的 UI 中显示有关批处理完成进度的信息变得很方便。

要通过 ID 检索批次,您可以使用Bus 门面的findBatch 方法:

use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;

Route::get('/batch/{batchId}', function (string $batchId) {
    return Bus::findBatch($batchId);
});

取消批次

有时您可能需要取消给定批次的执行。这可以通过调用cancel 上的方法Illuminate\Bus\Batch 实例:

/**
 * Execute the job.
 */
public function handle(): void
{
    if ($this->user->exceedsImportLimit()) {
        return $this->batch()->cancel();
    }

    if ($this->batch()->cancelled()) {
        return;
    }
}

正如您在前面的示例中可能已经注意到的那样,批处理作业通常应在继续执行之前确定其相应的批处理是否已被取消。但是,为方便起见,您可以将SkipIfBatchCancelled middleware代替工作。顾名思义,如果相应的批次已被取消,此中间件将指示 Laravel 不处理该作业:

use Illuminate\Queue\Middleware\SkipIfBatchCancelled;

/**
 * Get the middleware the job should pass through.
 */
public function middleware(): array
{
    return [new SkipIfBatchCancelled];
}

批量失败

当批处理作业失败时,catch 回调(如果已分配)将被调用。此回调仅针对批处理中失败的第一个作业调用。

允许失败

当批处理中的作业失败时,Laravel 会自动将批处理标记为“已取消”。如果您愿意,您可以禁用此行为,以便作业失败不会自动将批次标记为已取消。这可以通过调用allowFailures 分派批次时的方法:

$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->allowFailures()->dispatch();

重试失败的批处理作业

为了方便起见,Laravel 提供了一个queue:retry-batch 允许您轻松重试给定批次的所有失败作业的 Artisan 命令。这queue:retry-batch 命令接受其失败作业应重试的批次的 UUID:

php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5

修剪批次

在不修剪的情况下,job_batches table 可以非常快速地积累记录。为了减轻这种情况,你应该schedulequeue:prune-batches 每天运行的 Artisan 命令:

$schedule->command('queue:prune-batches')->daily();

默认情况下,将删除所有超过 24 小时的已完成批次。您可以使用hours 调用命令时的选项以确定保留批处理数据的时间。例如,以下命令将删除 48 小时前完成的所有批次:

$schedule->command('queue:prune-batches --hours=48')->daily();

有时候,你的jobs_batches 表可能会累积从未成功完成的批次的批次记录,例如作业失败且从未成功重试的批次。你可以指示queue:prune-batches 使用命令修剪这些未完成的批记录unfinished 选项:

$schedule->command('queue:prune-batches --hours=48 --unfinished=72')->daily();

同样,你的jobs_batches 表还可以为已取消的批次累积批次记录。你可以指示queue:prune-batches 命令使用删除这些取消的批记录cancelled 选项:

$schedule->command('queue:prune-batches --hours=48 --cancelled=72')->daily();

排队闭包

除了将作业类分派到队列之外,您还可以分派一个闭包。这非常适合需要在当前请求周期之外执行的快速、简单的任务。将闭包分派到队列时,闭包的代码内容经过加密签名,因此在传输过程中无法修改:

$podcast = App\Podcast::find(1);

dispatch(function () use ($podcast) {
    $podcast->publish();
});

使用catch 方法,您可以提供一个闭包,如果排队的闭包在耗尽所有队列后未能成功完成,则应执行该闭包配置的重试次数:

use Throwable;

dispatch(function () use ($podcast) {
    $podcast->publish();
})->catch(function (Throwable $e) {
    // This job has failed...
});

Warning
自从catch 回调由 Laravel 队列稍后序列化和执行,你不应该使用$this 变量在catch 回调。

运行队列工作者

queue:work 命令

Laravel 包含一个 Artisan 命令,该命令将启动一个队列工作者并在新作业被推送到队列时处理它们。您可以使用queue:work 工匠命令。请注意,一旦queue:work 命令已启动,它将继续运行直到手动停止或您关闭终端:

php artisan queue:work

Note
为了保持queue:work 进程永久在后台运行,您应该使用进程监视器,例如Supervisor 以确保队列工作者不会停止运行。

你可以包括-v 调用时标记queue:work 命令,如果您希望处理的作业 ID 包含在命令的输出中:

php artisan queue:work -v

请记住,队列工作者是长期存在的进程,并将启动的应用程序状态存储在内存中。因此,他们在启动后不会注意到您的代码库中的更改。所以,在部署过程中,一定要重新启动队列工作人员.此外,请记住,您的应用程序创建或修改的任何静态状态都不会在作业之间自动重置。

或者,您可以运行queue:listen 命令。当使用queue:listen 命令,当你想重新加载更新的代码或重置应用程序状态时,你不必手动重启 worker;但是,此命令的效率明显低于queue:work 命令:

php artisan queue:listen

运行多个队列工作者

要将多个工作人员分配到一个队列并同时处理作业,您应该简单地启动多个queue:work 过程。这可以通过终端中的多个选项卡在本地完成,也可以使用流程管理器的配置设置在生产环境中完成。使用主管时, 你可以使用numprocs 配置值。

指定连接和队列

您还可以指定工作人员应使用哪个队列连接。传递给的连接名称work 命令应对应于您定义的连接之一config/queue.php 配置文件:

php artisan queue:work redis

默认情况下,queue:work 命令仅处理给定连接上默认队列的作业。但是,您可以通过仅处理给定连接的特定队列来进一步自定义您的队列工作者。例如,如果您的所有电子邮件都在emails 在你的队列中排队redis 队列连接,您可以发出以下命令来启动只处理该队列的工作程序:

php artisan queue:work redis --queue=emails

处理指定数量的作业

--once 选项可用于指示工作人员仅处理队列中的单个作业:

php artisan queue:work --once

--max-jobs 选项可用于指示工作人员处理给定数量的作业然后退出。结合使用时,此选项可能很有用Supervisor 以便您的工作人员在处理给定数量的作业后自动重新启动,释放他们可能积累的所有内存:

php artisan queue:work --max-jobs=1000

处理所有排队的作业然后退出

--stop-when-empty 选项可用于指示工作人员处理所有作业,然后优雅地退出。如果您希望在队列为空后关闭容器,则此选项在处理 Docker 容器中的 Laravel 队列时很有用:

php artisan queue:work --stop-when-empty

在给定的秒数内处理作业

--max-time 选项可用于指示工作人员在给定的秒数内处理作业,然后退出。结合使用时,此选项可能很有用Supervisor 这样你的工作人员在处理作业给定时间后自动重启,释放他们可能积累的任何内存:

# Process jobs for one hour and then exit...
php artisan queue:work --max-time=3600

工人睡眠时长

当队列中有可用作业时,worker 将继续处理作业,作业之间没有延迟。但是,那sleep 选项确定如果没有可用的工作,工人将“睡眠”多少秒。当然,在睡眠期间,worker 不会处理任何新工作:

php artisan queue:work --sleep=3

资源注意事项

守护进程队列工作者在处理每个作业之前不会“重启”框架。因此,您应该在每个作业完成后释放任何繁重的资源。例如,如果您正在使用 GD 库进行图像处理,您应该释放内存imagedestroy 完成图像处理后。

队列优先级

有时您可能希望优先处理队列的处理方式。例如,在您的config/queue.php 配置文件,你可以设置默认queue 为您redis 连接到low.但是,有时您可能希望将工作推到high 像这样的优先队列:

dispatch((new Job)->onQueue('high'));

启动一个工人来验证所有的high 队列作业在继续处理任何作业之前被处理low 队列,将逗号分隔的队列名称列表传递给work 命令:

php artisan queue:work --queue=high,low

队列工作者和部署

由于队列工作者是长期存在的进程,它们不会在不重新启动的情况下注意到代码的更改。因此,使用队列工作者部署应用程序的最简单方法是在部署过程中重启工作者。您可以通过发出queue:restart 命令:

php artisan queue:restart

此命令将指示所有队列工作者在完成当前作业处理后从容退出,以免现有作业丢失。由于队列工作人员将在queue:restart 命令被执行,你应该运行一个进程管理器,比如Supervisor 自动重启队列工作者。

Note
队列使用cache 存储重启信号,因此您应该在使用此功能之前验证是否为您的应用程序正确配置了缓存驱动程序。

作业到期和超时

职位到期

在你的config/queue.php 配置文件,每个队列连接定义一个retry_after 选项。此选项指定队列连接在重试正在处理的作业之前应等待的秒数。例如,如果值retry_after 被设定为90,如果作业已处理 90 秒而未被释放或删除,则该作业将被释放回队列。通常,您应该设置retry_after您的作业完成处理应该合理花费的最大秒数的值。

Warning
唯一不包含的队列连接retry_after 值为 Amazon SQS。 SQS 将根据默认可见性超时 它在 AWS 控制台中进行管理。

工人超时

queue:work Artisan 命令公开了一个--timeout 选项。默认情况下,--timeout 值为 60 秒。如果作业的处理时间超过超时值指定的秒数,则处理该作业的工作人员将退出并出错。通常,worker 将由一个自动重启服务器上配置的进程管理器:

php artisan queue:work --timeout=60

retry_after 配置选项和--timeout CLI 选项不同,但协同工作可确保作业不会丢失且作业仅成功处理一次。

Warning
--timeout 值应该总是比你的至少短几秒retry_after 配置值。这将确保处理冻结作业的工作人员始终在重试作业之前终止。如果你的--timeout 选项比你的长retry_after 配置值,您的作业可能会被处理两次。

主管配置

在生产中,您需要一种方法来保持您的queue:work 进程运行。 Aqueue:work 进程可能会由于各种原因停止运行,例如超出工作超时或执行queue:restart 命令。

出于这个原因,您需要配置一个进程监视器,它可以检测您何时queue:work 进程退出并自动重启。此外,进程监视器可以让你指定多少queue:work 您希望同时运行的进程。 Supervisor 是 Linux 环境中常用的进程监视器,我们将在后面的文档中讨论如何配置它。

安装主管

Supervisor 是 Linux 操作系统的进程监视器,会自动重启你的queue:work 如果他们失败了。要在 Ubuntu 上安装 Supervisor,您可以使用以下命令:

sudo apt-get install supervisor

Note
如果自己配置和管理 Supervisor 听起来让人不知所措,请考虑使用Laravel 锻造,它将自动为您的生产 Laravel 项目安装和配置 Supervisor。

配置主管

主管配置文件通常存储在/etc/supervisor/conf.d 目录。在此目录中,您可以创建任意数量的配置文件来指示主管如何监控您的进程。例如,让我们创建一个laravel-worker.conf 启动和监控的文件queue:work 流程:

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600

在这个例子中,numprocs 指令将指示 Supervisor 运行八个queue:work 处理并监视所有这些,如果它们失败则自动重新启动它们。你应该改变command 配置的指令以反映您所需的队列连接和工作人员选项。

Warning
你应该确保的价值stopwaitsecs 大于运行时间最长的作业消耗的秒数。否则,Supervisor 可能会在作业完成处理之前将其杀死。

首任主管

创建配置文件后,您可以更新 Supervisor 配置并使用以下命令启动进程:

sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start laravel-worker:*

有关 Supervisor 的更多信息,请参阅主管文件.

处理失败的工作

有时您排队的作业会失败。别担心,事情并不总是按计划进行! Laravel 包含一个方便的方法来指定应尝试作业的最大次数.异步作业超过此尝试次数后,它将被插入到failed_jobs 数据库表。同步调度的作业 失败的不存储在此表中,应用程序会立即处理它们的异常。

迁移以创建failed_jobs table 通常已经存在于新的 Laravel 应用程序中。但是,如果您的应用程序不包含此表的迁移,您可以使用queue:failed-table 创建迁移的命令:

php artisan queue:failed-table

php artisan migrate

当运行一个队列工作者 过程中,您可以指定尝试使用作业的最大次数--tries 打开queue:work 命令。如果您没有为--tries选项,作业将只尝试一次或多次,由作业类指定$tries 财产:

php artisan queue:work redis --tries=3

使用--backoff 选项,你可以指定 Laravel 在重试遇到异常的作业之前应该等待多少秒。默认情况下,作业会立即释放回队列,以便可以再次尝试:

php artisan queue:work redis --tries=3 --backoff=3

如果你想配置 Laravel 在重试每个作业遇到异常的作业之前应该等待多少秒,你可以通过定义一个backoff 您的工作类别的财产:

/**
 * The number of seconds to wait before retrying the job.
 *
 * @var int
 */
public $backoff = 3;

如果您需要更复杂的逻辑来确定作业的退避时间,您可以定义一个backoff 你工作类的方法:

/**
* Calculate the number of seconds to wait before retrying the job.
*/
public function backoff(): int
{
    return 3;
}

您可以通过从backoff 方法。在此示例中,第一次重试的重试延迟为 1 秒,第二次重试为 5 秒,第三次重试为 10 秒:

/**
* Calculate the number of seconds to wait before retrying the job.
*
* @return array<int, int>
*/
public function backoff(): array
{
    return [1, 5, 10];
}

作业失败后清理

当特定作业失败时,您可能希望向您的用户发送警报或恢复该作业部分完成的任何操作。为此,您可以定义一个failed 在你的工作类上的方法。这Throwable 导致作业失败的实例将被传递给failed 方法:

<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Throwable;

class ProcessPodcast implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     */
    public function __construct(
        public Podcast $podcast,
    ) {}

    /**
     * Execute the job.
     */
    public function handle(AudioProcessor $processor): void
    {
        // Process uploaded podcast...
    }

    /**
     * Handle a job failure.
     */
    public function failed(Throwable $exception): void
    {
        // Send user notification of failure, etc...
    }
}

Warning
在调用之前实例化作业的新实例failed 方法;因此,任何可能发生在handle 方法将丢失。

重试失败的作业

查看已插入您的所有失败作业failed_jobs 数据库表,你可以使用queue:failed 工匠命令:

php artisan queue:failed

queue:failed 命令将列出作业的作业 ID、连接、队列、失败时间和其他有关作业的信息。作业 ID 可用于重试失败的作业。例如,重试 ID 为ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece,发出以下命令:

php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece

如有必要,您可以将多个 ID 传递给命令:

php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d

您还可以重试特定队列的所有失败作业:

php artisan queue:retry --queue=name

要重试所有失败的作业,请执行queue:retry 命令和传递all 作为身份证:

php artisan queue:retry all

如果你想删除一个失败的作业,你可以使用queue:forget 命令:

php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d

Note
使用时Horizon,你应该使用horizon:forget 命令删除失败的作业而不是queue:forget 命令。

从中删除所有失败的作业failed_jobs 表,你可以使用queue:flush 命令:

php artisan queue:flush

忽略缺失模型

将 Eloquent 模型注入作业时,模型会在放入队列之前自动序列化,并在处理作业时从数据库中重新检索。但是,如果模型在作业等待工作人员处理时被删除,您的作业可能会失败并显示ModelNotFoundException.

为方便起见,您可以通过设置作业的deleteWhenMissingModels 财产给true.当此属性设置为true,Laravel 将悄悄地放弃工作而不引发异常:

/**
 * Delete the job if its models no longer exist.
 *
 * @var bool
 */
public $deleteWhenMissingModels = true;

修剪失败的作业

您可以修剪应用程序中的记录failed_jobs 通过调用表queue:prune-failed 工匠命令:

php artisan queue:prune-failed

默认情况下,将删除所有超过 24 小时的失败作业记录。如果您提供--hours 该命令的选项,将只保留在最后 N 小时内插入的失败作业记录。例如,以下命令将删除超过 48 小时前插入的所有失败作业记录:

php artisan queue:prune-failed --hours=48

在 DynamoDB 中存储失败的作业

Laravel 还支持将失败的作业记录存储在DynamoDB 而不是关系数据库表。但是,您必须创建一个 DynamoDB 表来存储所有失败的作业记录。通常,该表应命名为failed_jobs, 但您应该根据queue.failed.table 应用程序中的配置值queue 配置文件。

failed_jobs 表应该有一个名为的字符串主分区键application 和一个名为uuid.这application 密钥的一部分将包含您的应用程序名称,如name 应用程序中的配置值app 配置文件。由于应用程序名称是 DynamoDB 表键的一部分,您可以使用同一个表来存储多个 Laravel 应用程序的失败作业。

此外,请确保您安装了 AWS 开发工具包,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 通信:

composer require aws/aws-sdk-php

接下来,设置queue.failed.driver 配置选项的值dynamodb.此外,您应该定义key,secret, 和region 失败作业配置数组中的配置选项。这些选项将用于通过 AWS 进行身份验证。当使用dynamodb 司机queue.failed.database 配置选项是不必要的:

'failed' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'failed_jobs',
],

禁用失败的作业存储

你可以指示 Laravel 丢弃失败的作业而不存储它们通过设置queue.failed.driver 配置选项的值null.通常,这可以通过QUEUE_FAILED_DRIVER 环境变量:

QUEUE_FAILED_DRIVER=null

失败的工作事件

如果您想注册一个将在作业失败时调用的事件侦听器,您可以使用Queue 门面的failing 方法。例如,我们可以从boot 的方法AppServiceProvider Laravel 中包含:

<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;

class AppServiceProvider extends ServiceProvider
{
    /**
     * Register any application services.
     */
    public function register(): void
    {
        // ...
    }

    /**
     * Bootstrap any application services.
     */
    public function boot(): void
    {
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->exception
        });
    }
}

从队列中清除作业

Note
使用时Horizon,你应该使用horizon:clear 命令从队列中清除作业而不是queue:clear 命令。

如果您想从默认连接的默认队列中删除所有作业,您可以使用queue:clear 工匠命令:

php artisan queue:clear

您还可以提供connection 参数和queue 从特定连接和队列中删除作业的选项:

php artisan queue:clear redis --queue=emails

Warning
从队列中清除作业仅适用于 SQS、Redis 和数据库队列驱动程序。此外,SQS 消息删除过程最多需要 60 秒,因此在清除队列后最多 60 秒内发送到 SQS 队列的作业也可能会被删除。

监控您的队列

如果您的队列收到突然涌入的作业,它可能会不堪重负,导致等待作业完成的时间很长。如果您愿意,Laravel 可以在您的队列作业计数超过指定阈值时提醒您。

首先,您应该安排queue:monitor 命令每分钟运行.该命令接受您希望监控的队列的名称以及您想要的作业计数阈值:

php artisan queue:monitor redis:default,redis:deployments --max=100

单独安排此命令不足以触发通知,提醒您队列的不堪重负状态。当命令遇到作业计数超过阈值的队列时,一个Illuminate\Queue\Events\QueueBusy 事件将被发送。您可以在应用程序的EventServiceProvider 为了向您或您的开发团队发送通知:

use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;

/**
 * Register any other events for your application.
 */
public function boot(): void
{
    Event::listen(function (QueueBusy $event) {
        Notification::route('mail', 'dev@example.com')
                ->notify(new QueueHasLongWaitTime(
                    $event->connection,
                    $event->queue,
                    $event->size
                ));
    });
}

Testing

当测试调度作业的代码时,您可能希望指示 Laravel 不要实际执行作业本身,因为作业的代码可以直接和独立于调度它的代码进行测试。当然,要测试作业本身,您可以实例化一个作业实例并调用handle 方法直接在你的测试中。

您可以使用Queue 门面的fake 防止排队作业实际被推送到队列的方法。调用后Queue 门面的fake 方法,然后您可以断言应用程序试图将作业推送到队列:

<?php

namespace Tests\Feature;

use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;

class ExampleTest extends TestCase
{
    public function test_orders_can_be_shipped(): void
    {
        Queue::fake();

        // Perform order shipping...

        // Assert that no jobs were pushed...
        Queue::assertNothingPushed();

        // Assert a job was pushed to a given queue...
        Queue::assertPushedOn('queue-name', ShipOrder::class);

        // Assert a job was pushed twice...
        Queue::assertPushed(ShipOrder::class, 2);

        // Assert a job was not pushed...
        Queue::assertNotPushed(AnotherJob::class);

        // Assert that a Closure was pushed to the queue...
        Queue::assertClosurePushed();
    }
}

您可以将闭包传递给assertPushed 或者assertNotPushed 方法以断言已推送通过给定“真实性测试”的作业。如果至少有一项作业被推送并通过了给定的真值测试,则断言将成功:

Queue::assertPushed(function (ShipOrder $job) use ($order) {
    return $job->order->id === $order->id;
});

伪造工作的一个子集

如果你只需要伪造特定的作业,而让你的其他作业正常执行,你可以将应该伪造的作业的类名传递给fake 方法:

public function test_orders_can_be_shipped(): void
{
    Queue::fake([
        ShipOrder::class,
    ]);

    // Perform order shipping...

    // Assert a job was pushed twice...
    Queue::assertPushed(ShipOrder::class, 2);
}

您可以伪造除一组指定作业之外的所有作业,使用except 方法:

Queue::fake()->except([
    ShipOrder::class,
]);

测试作业链

要测试作业链,您需要使用Bus Facade 的伪造能力。这Bus 门面的assertChained 方法可用于断言作业链 被派出。这assertChained 方法接受一个链式作业数组作为它的第一个参数:

use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;

Bus::fake();

// ...

Bus::assertChained([
    ShipOrder::class,
    RecordShipment::class,
    UpdateInventory::class
]);

正如您在上面的示例中看到的,链式作业数组可能是作业类名称的数组。但是,您也可以提供一组实际的作业实例。这样做时,Laravel 将确保作业实例属于同一类,并且与您的应用程序调度的链式作业具有相同的属性值:

Bus::assertChained([
    new ShipOrder,
    new RecordShipment,
    new UpdateInventory,
]);

您可以使用assertDispatchedWithoutChain 断言在没有作业链的情况下推送作业的方法:

Bus::assertDispatchedWithoutChain(ShipOrder::class);

测试作业批次

Bus 门面的assertBatched 方法可用于断言一批工作 被派出。给的闭包assertBatched 方法接收一个实例Illuminate\Bus\PendingBatch,可用于检查批次中的作业:

use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;

Bus::fake();

// ...

Bus::assertBatched(function (PendingBatch $batch) {
    return $batch->name == 'import-csv' &&
           $batch->jobs->count() === 10;
});

测试作业/批处理交互

此外,您可能偶尔需要测试单个作业与其基础批处理的交互。例如,您可能需要测试作业是否取消了对其批次的进一步处理。为此,您需要通过withFakeBatch 方法。这withFakeBatch 方法返回一个包含作业实例和假批次的元组:

[$job, $batch] = (new ShipOrder)->withFakeBatch();

$job->handle();

$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);

职位活动

使用beforeafter 上的方法Queue facade,您可以指定在处理排队作业之前或之后执行的回调。这些回调是为仪表板执行额外日志记录或增量统计的绝佳机会。通常,您应该从boot 一个方法服务提供者.例如,我们可以使用AppServiceProvider Laravel 中包含:

<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
    /**
     * Register any application services.
     */
    public function register(): void
    {
        // ...
    }

    /**
     * Bootstrap any application services.
     */
    public function boot(): void
    {
        Queue::before(function (JobProcessing $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        Queue::after(function (JobProcessed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });
    }
}

使用looping 上的方法Queue facade,您可以指定在工作人员尝试从队列中获取作业之前执行的回调。例如,您可以注册一个闭包来回滚任何由先前失败的作业保持打开状态的事务:

use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;

Queue::looping(function () {
    while (DB::transactionLevel() > 0) {
        DB::rollBack();
    }
});
豫ICP备18041297号-2