laravel队列基本使用

1.队列创建

1.1 队列连接

配置文件config/queue.php connections 配置项指定了后台队列服务连接,QUEUE_CONNECTION指定方式

laravel5.7版本以前是配置是Queue driver

任务在队列中需要经过两个过程:一个是任务入队,就是将要执行的任务,放到队列中去的过程,所有会发生这一过程的业务、对象、调用等等,可以统称为生产者;与之对应的,将任务从队列中取出,并执行的过程,叫做任务出队,所有会发生这一过程的业务、对象、调用等等,可以统称为消费者。

1.2 队列驱动

常用链接驱动:redis database sqs beanstalkd sync rabbitmq

  • sync:属于同步驱动(阻塞),不要配置队列监听,适合本地或者个人测试
  • Null:不启动队列,生产者产生的任务被丢弃
  • Database:数据库队列驱动,生产者产生的任务放入数据库
  • Redis:Redis 队列驱动,生产者产生的任务放入 Redis
  • beanstalkd:轻量级消息队列 https://learnku.com/articles/18687
  • 驱动包:
  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~3.0
  • Redis: predis/predis ~1.0

1.3 依赖的服务

队列服务由服务提供者 QueueServiceProvider 注册

如果队列驱动使用的时 Database,那么 $connection 指的就是 Illuminate\Queue\DatabaseQueue 的实例,如果队列驱动使用的时 Redis,那么 $connection 指的就是 Illuminate\Queue\RedisQueue 的实例。 $connection 的 pop 方法会从队列存储中取出下一个 payload,经过队列驱动的转化,得到不同的 QueueJob 实例, $job 对象,调用 $job 的 fire 方法,任务就开始执行。

1.4 queue.php参数说明

  • retry_after:该配置项的目的是定义任务在执行以后多少秒后释放回队列参数项 --timeout 的值应该始终小于配置项 retry_after 的值,这是为了确保队列进程总在任务重试以前关闭。如果 --timeout 比retry_after 大,那么任务可能被执行两次。

  • block_for:使用 Redis 队列时,可以使用 block_for 配置项来指定驱动在迭代队列进程循环并重新轮询 Redis 数据库之前等待可用队列任务的时间。

1.5 创建任务

创建任务php artisan make:job queueJob Job方法一般由构造函数和handle方法组成,handle 方法在任务被处理的时候调用,我们可以在任务的 handle 方法中进行依赖注入。Laravel 服务容器会自动注入这些依赖。

注:二进制数据,如原生图片内容,在传递给队列任务之前先经过 base64_encode 方法处理,此外,该任务被推送到队列时将不会被序列化为 JSON 格式。

任务中间件throttle

任务中间件可以在执行队列任务前封装一些自定义逻辑,从而减少任务本身的模板代码量 以redis限流执行为例:

2.1 任务分发

dispatch 方法触发任务指派动作。当执行 Job::dispatch() 时,会实例化一个 Illuminate\Foundation\Bus\PendingDispatch 对象 PendingDispatch

任务是如何被放入队列中的呢?这就引出了 payload 这个概念。当 Dispatcher 这个服务通过 dispatch 方法派发任务时,会通过队列服务,将任务 push 到队列中,在 push 的过程中会执行 见:Illuminate\Queue\Queue 入队对象是一个实例: 通过实例的方式,将任务推送到队列中,在 createPayload 的环节,执行的是 createObjectPayload 方法,这时可以利用系统提供的队列机制,实例只需要有一个 handle 方法作为执行方法,来承接 CallQueuedHandler::handle () 传递的调用

入队对象是一个字符串: 通过字符串的方式,将任务推送到队列中,在 createPayload 的环节,执行的是 createStringPayload 方法 Queue::push实例

常用的dispatch: 在command中分发 助手函数 dispatch(new queueJob())->onQueue('quee-name')(helper->dispatch) 在控制器中分发 $this->dispatch(new queueJob())->onQueue('quee-name') (trait DispatchesJobs 延迟分发:

delay连续操作做放在dispatch后面

其他方式分发: dispatchIf:等于true投递; (laravel7) dispatchUnless:和dispatchIf相反,为false时投递 dispatch_now/dispatchNow:如果你立即想要分发任务(同步),可以使用 dispatchNow 方法。使用该方法时,对应任务不会被推送到队列,而是立即在当前进程中运行,和驱动类型sync类似

dispatchAfterResponse:响应发送到客户端之后分发 dispatchAfterResponse 方法在响应发送给用户浏览器之后再分发任务,这样一来,即使队列任务在执行的情况下,用户仍然继续使用应用,这种场景适用于任务耗时较长的情况,比如发送邮件;也可以通过传入闭包到 dispatch 方法,再链接 afterResponse 方法的方式实现上述同样的效果 dispatch(function () { //todo 发送邮件耗时任务 })->afterResponse();

任务链withChain 任务链就是指定一个需要在一个序列中执行的队列任务列表,如果序列中的某个任务失败,其它任务将不再运行。要执行一个队列任务链,可以使用任意可分发任务上的 withChain 方法:

3.1 队列执行

基于时间的尝试次数:retryUntil 定义在任务失败前的最大尝试次数外,在指定时间内允许任务的最大尝试次数 队列执行 php artisan queue:work queue:work queue:listen listen代码修改后不需要手动重启队列进程,但是命令性能不及 queue:work --tries 最大失败次数 (代码优先级高于命令行) --timeout (handle中方法执行时间)

  • queue:work 默认只执行一次队列请求, 当请求执行完成后就终止;
  • queue:listen 监听队列请求, 只要运行着, 就能一直接受请求, 除非手动终止;
  • queue:work --daemon 同 listen 一样, 只要运行着, 就能一直接受请求, 不一样的地方是在这个运行模式下, 当新的请求到来的时候, 不重新加载整个框架, 而是直接 fire 动作.
  • php artisan queue:work redis/database(指定连接)
  • --queue 指定队列名称
  • --once 处理队列中的单个任务,只会执行一条
  • --sleep=3 休眠时间

任务出队是建立在消费者开始工作的基础之上,一类消费者就是 Worker,队列处理器。通过命令行 php artisan queue:work 来启动一个 Worker。Worker 在 daemon 模式下,会不断的尝试从队列中取出任务并执行,这一过程有以下执行环节:

  • 第一步:检查是否要暂停队列,是则暂停一段时间,否则经行下一步
  • 第二步:取出当前要执行的任务,并给任务设置一个超时进程,
  • 第三步:执行任务,如果当前没有任务,暂停一段时间
  • 第四步:检查是否要停止队列
  • 遇到三种情况会停止队列:
  • Worker 进程收到 SIGTERM 信号
  • 使用内存超过限制
  • 收到重启命令

事件监听器的原理是,通过 Illuminate\Events\CallQueuedListener 来作为一个特殊的 “任务”,环节的重点其实是,事件、监听器、CallQueuedListener 三者之间是如何进行关联的,也就是事件监听机制。AppServiceProvider->boot

4.1 配置守护进程

supervisor

  • 安装:yum install supervisor / apt-get install supervisor conf配置说明:

  • sudo supervisorctl reread

  • sudo supervisorctl update

  • sudo supervisorctl start laravel-worker:*

  • --stop-when-empty 处理所有队列任务然后退出

  • --queue=high,low 队列优先级,high队列任务会优先执行

  • --sleep=3 没有新的有效任务产生时的休眠时间

  • --delay=3 重试失败任务之前需要等待多少秒,默认是立即重试

  • retryAfter:在每个任务的基础上配置失败的任务重试延迟

基于脚本的重试:(运行失败的重试、投递失败的重试)

打 赏