ThinkPHP8 think-queue 官方队列扩展详解与案例

think-queue 详解与使用指南

think-queue 是 ThinkPHP 官方提供的队列扩展,专注于处理异步任务(如邮件发送、短信通知、数据同步等)。通过将耗时操作放入队列异步执行,可避免阻塞主请求,显著提升系统响应速度和稳定性。

一、什么是 think-queue?

think-queue 基于 ThinkPHP 的依赖注入和事件机制,实现了生产-消费模型

  • 生产者:在业务逻辑中生成任务(如用户注册后触发“发送欢迎邮件”任务),并将任务推送到队列。
  • 消费者:独立进程持续监听队列,取出任务并后台执行(如专门的进程负责发送邮件)。

通过这种机制,耗时操作(如调用第三方接口)无需阻塞主请求,用户可立即获得响应,任务在后台异步完成。

二、核心优势

  1. 提升系统响应速度
    同步执行耗时操作(如发送邮件)会导致用户等待(例如注册接口因发送邮件耗时3秒)。使用队列后,主请求仅需将任务推送到队列(毫秒级),即可立即返回,大幅优化用户体验。

  2. 解耦业务逻辑
    例如“用户注册”的核心逻辑是“保存用户信息”,而“发送欢迎邮件”“赠送新人礼包”等属于附加逻辑。通过队列拆分这些逻辑,可避免主流程代码臃肿,便于后续扩展(如新增“短信通知”只需添加新任务)。

  3. 削峰填谷,抗并发
    面对流量峰值(如秒杀活动),大量请求触发的耗时操作可能耗尽系统资源(如数据库连接超限)。队列可缓冲任务,消费者按系统承载能力逐步处理,避免瞬间压力击垮系统。

  4. 任务重试与失败处理
    支持失败自动重试(如配置“重试3次,间隔5秒”),避免因网络波动、第三方服务临时故障导致任务丢失。重试上限后会记录失败原因,便于人工排查。

  5. 多驱动支持
    适配不同环境,支持多种存储驱动:

    • Redis:高性能、支持持久化,推荐生产环境使用。
    • Database:基于数据库表存储,无需额外服务,适合小流量场景。
    • Beanstalkd:轻量级分布式队列,适合高并发场景。
    • Sync:同步执行,仅用于开发调试。
  6. 与 ThinkPHP 深度集成
    支持依赖注入(如在任务中注入模型/服务)、框架日志系统(自动集成任务日志)、多环境配置(通过 config/queue.php 统一管理)。

三、核心特性与使用流程

1. 核心特性

  • 任务优先级:可指定任务执行顺序(如“订单支付通知”优先于“营销推送”)。
  • 延迟执行:支持延迟任务(如“订单创建后30分钟未支付自动取消”)。
  • 超时控制:设置任务执行超时时间,避免无限阻塞。
  • 队列分组:不同类型任务可放入不同队列(如“email”队列处理邮件,“sms”队列处理短信),便于单独管理。

2. 基本使用流程

以“用户注册后异步发送邮件”为例,步骤如下:

① 安装扩展
1
composer require topthink/think-queue
② 配置队列驱动

config/queue.php 中配置驱动(推荐 Redis):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
return [
'default' => 'redis', // 默认驱动
'connections' => [
'redis' => [
'type' => 'redis',
'host' => '127.0.0.1', // Redis主机
'port' => 6379, // 端口
'password' => '', // 密码(如有)
'select' => 0, // 数据库索引
'timeout' => 0, // 超时时间
'persistent' => false, // 是否长连接
],
],
];
③ 封装邮件发送工具

创建 app\utils\Emailer 作为邮件发送工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
<?php
namespace app\utils;

use PHPMailer\PHPMailer\PHPMailer;
use PHPMailer\PHPMailer\Exception;
use think\facade\Config;
use think\facade\Log;

class Emailer
{
private $emailer;

/**
* 初始化邮件配置
* @throws Exception
*/
function __construct()
{
$this->emailer = new PHPMailer(true);
$this->emailer->isSMTP();
$this->emailer->Host = Config::get("email.host");
$this->emailer->SMTPAuth = true;
$this->emailer->Username = Config::get("email.send_user");
$this->emailer->Password = Config::get("email.password");
$this->emailer->SMTPSecure = Config::get("email.encryption");
$this->emailer->Port = Config::get("email.port");
$this->emailer->CharSet = 'UTF-8';
$this->emailer->SMTPAutoTLS = true;
}

/**
* 发送邮件
* @param string $reciver 收件人邮箱
* @param string $subject 邮件主题
* @param string $content 邮件内容
* @param bool $isHTML 是否为HTML内容
* @return array 发送结果
*/
function send(
string $reciver = "2022471677@qq.com",
string $subject = "",
string $content = "",
bool $isHTML = false
): array {
try {
// 参数校验
if (empty($subject)) throw new \InvalidArgumentException("邮件主题不能为空");
if (empty($content)) throw new \InvalidArgumentException("邮件内容不能为空");
if (!filter_var($reciver, FILTER_VALIDATE_EMAIL)) {
throw new \InvalidArgumentException("收件人邮箱格式不正确: {$reciver}");
}

// 清理历史收件人
$this->emailer->clearAddresses();

// 设置发件人
$sendUser = Config::get("email.send_user");
$this->emailer->setFrom($sendUser, Config::get("email.send_name", "28.7Blog"));

// 设置收件人及内容
$this->emailer->addAddress($reciver);
$this->emailer->isHTML($isHTML);
$this->emailer->Subject = $subject;
$this->emailer->Body = $content;
if ($isHTML) {
$this->emailer->AltBody = strip_tags($content); // 纯文本替代内容
}

// 执行发送
$this->emailer->send();
Log::channel("queuejob")->info("邮件发送成功,收件人: {$reciver},主题: {$subject}");
return [
'success' => true,
'message' => "邮件已成功发送至: {$reciver}"
];

} catch (Exception $e) {
$errorMsg = "邮件发送失败: " . $e->getMessage();
Log::channel("queuejob")->error("{$errorMsg},收件人: {$reciver},主题: {$subject}");
return ['success' => false, 'message' => $errorMsg];
} catch (\InvalidArgumentException $e) {
$errorMsg = "参数错误: " . $e->getMessage();
Log::channel("queuejob")->error($errorMsg);
return ['success' => false, 'message' => $errorMsg];
} catch (\Exception $e) {
$errorMsg = "未知错误: " . $e->getMessage();
Log::channel("queuejob")->error($errorMsg);
return ['success' => false, 'message' => $errorMsg];
}
}
}
④ 创建队列任务类

app/job 目录下创建 SendEmailJob.php,封装异步发送邮件的逻辑:
但是请注意,如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<?php
namespace app\job;

use app\utils\Emailer;
use think\queue\Job;

class SendEmailJob
{
/**
* 执行队列任务
* @param Job $job 任务对象
* @param array $data 任务参数(收件人、主题、内容等)
*/
public function fire(Job $job, array $data)
{
try {
// 解析任务参数
$reciver = $data['reciver'] ?? '';
$subject = $data['subject'] ?? '';
$content = $data['content'] ?? '';
$isHTML = $data['isHTML'] ?? false;

// 调用邮件工具发送
$emailer = new Emailer();
$result = $emailer->send($reciver, $subject, $content, $isHTML);

// 处理任务结果
if ($result['success']) {
$job->delete(); // 成功:删除任务
Log::channel("queuejob")->info("邮件任务执行成功,收件人: {$reciver}");
} else {
// 失败:重试(最多3次)
if ($job->attempts() < 3) {
$job->release(3); // 3秒后重试
} else {
$job->delete();
Log::channel("queuejob")->error("邮件任务重试超限,收件人: {$reciver},原因: {$result['message']}");
}
}
} catch (\Exception $e) {
Log::channel("queuejob")->error("邮件任务异常: {$e->getMessage()}");
$job->delete();
}
}
}
⑤ 推送任务到队列-发布任务

think\facade\Queue::push($job, $data = ‘’, $queue = null) 和 think\facade\Queue::later($delay, $job, $data = ‘’, $queue = null) 两个方法,前者是立即执行,后者是在$delay秒后执行
命名空间是app\job的,比如上面的例子一,写Job1类名即可
其他的需要些完整的类名,比如上面的例子二,需要写完整的类名app\lib\job\Job2
如果一个任务类里有多个小任务的话,如上面的例子二,需要用@+方法名app\lib\job\Job2@task1、app\lib\job\Job2@task2

  • $data 是你要传到任务里的参数
  • $queue 队列名,指定这个任务是在哪个队列上执行,同下面监控队列的时候指定的队列名,可不填

例如:在业务逻辑(如注册接口)中,通过 Queue::push() 推送任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?php
namespace app\service;

use think\facade\Queue;
use app\job\SendEmailJob;

class UserService
{
public function register(array $userData): array
{
$jobData = [
'reciver' => $userData['email'], // 收件人邮箱(需确保已解码)
'subject' => "欢迎注册",
'content' => "亲爱的用户,感谢注册我们的平台!",
'isHTML' => false
];
// 推送参数:任务类、任务数据、队列名称(可选)
Queue::push(SendEmailJob::class, $jobData, 'email');

return [
'success' => true,
'message' => "注册成功,欢迎邮件将尽快发送"
];
}
}
⑥ 启动队列消费者

执行命令启动消费者进程,监听并处理队列任务:

1
2
3
4
5
# 监听名为 "email" 的队列(与推送时的队列名称一致)
php think queue:listen --queue email

# 或使用 work 命令(处理完当前任务后退出,适合定时调用)
php think queue:work --queue email

四、适用场景

  • 通知类任务:邮件发送、短信通知、APP推送等。
  • 耗时操作:大数据导出、报表生成、图片/视频处理等。
  • 定时任务:订单超时取消、会员等级过期处理(结合延迟任务实现)。
  • 峰值流量处理:秒杀订单创建、活动参与记录等。

五、与其他异步方案的对比

方案 优势 劣势 适用场景
think-queue 跨环境兼容(无需Swoole)、易用性高 依赖外部存储(如Redis)、性能略低 大多数PHP环境、中小流量场景
Swoole Task 基于内存,性能极高 依赖Swoole扩展,部署复杂 高并发场景、有Swoole环境
定时任务(Crontab) 适合周期性任务 实时性差,不适合即时异步任务 每日数据统计、定期清理等

六、使用注意事项

  1. 消费者进程守护:生产环境需用 supervisor 等工具托管消费者进程,确保意外退出后自动重启。
  2. 任务幂等性:任务可能因重试被多次执行,需保证逻辑“重复执行结果一致”(如用唯一ID避免重复发送邮件)。
  3. 资源释放:任务中涉及的数据库连接、文件句柄等需手动释放,避免资源泄漏。

总结:think-queue 是 ThinkPHP 处理异步任务的首选工具,通过异步化、解耦和削峰能力,显著提升系统响应速度和稳定性,尤其适合通知、耗时操作等场景。