队列与后台任务
一、为什么需要队列
某些操作不应该让用户等待:
❌ 同步处理(用户等待 5 秒):
POST /register → 创建账号 → 发送激活邮件(2s)→ 生成推荐内容(3s)→ 返回
✅ 队列异步处理(用户立即得到响应):
POST /register → 创建账号 → 入队两个 Job → 立即返回 201
↓ ↓
Worker: 发邮件 Worker: 生成推荐适合队列的场景:
- 发送邮件/短信/推送通知
- 图片/视频处理(压缩、转码)
- 生成 PDF 报表
- 第三方 API 调用(限速场景)
- 数据同步与 ETL
- 大批量数据库写入
二、安装配置(BullMQ)
BullMQ 是基于 Redis 的高性能队列,是 NestJS 官方推荐的队列方案:
bash
npm install @nestjs/bullmq bullmq ioredistypescript
// app.module.ts
import { BullModule } from '@nestjs/bullmq';
@Module({
imports: [
BullModule.forRootAsync({
inject: [ConfigService],
useFactory: (config: ConfigService) => ({
connection: {
url: config.get('REDIS_URL', 'redis://localhost:6379'),
maxRetriesPerRequest: null, // BullMQ 要求
},
}),
}),
// 注册具体队列
BullModule.registerQueue(
{ name: 'email' }, // 邮件队列
{ name: 'image-process' }, // 图片处理队列
{ name: 'notification' }, // 通知队列
),
],
})三、生产者(Producer)
将 Job 加入队列:
typescript
// email/email.producer.ts
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
export interface SendEmailJobData {
to: string;
subject: string;
template: 'welcome' | 'reset-password' | 'monthly-report';
context: Record<string, any>;
}
@Injectable()
export class EmailProducer {
constructor(
@InjectQueue('email') private emailQueue: Queue,
) {}
// 加入队列(立即执行)
async sendWelcomeEmail(user: { id: number; email: string; name: string }) {
await this.emailQueue.add(
'send-welcome', // Job 名称
{
to: user.email,
subject: '欢迎加入!',
template: 'welcome',
context: { name: user.name },
} as SendEmailJobData,
{
attempts: 3, // 失败重试 3 次
backoff: { type: 'exponential', delay: 2000 }, // 指数退避
removeOnComplete: { count: 100 }, // 成功后保留最近 100 条记录
removeOnFail: { count: 500 }, // 失败记录保留 500 条
},
);
}
// 延迟执行(1 小时后发送)
async scheduleFollowUpEmail(userId: number) {
await this.emailQueue.add(
'send-followup',
{ userId },
{ delay: 60 * 60 * 1000 }, // 1 小时后
);
}
// 优先级队列(数字越小优先级越高)
async sendUrgentAlert(email: string, message: string) {
await this.emailQueue.add(
'send-alert',
{ to: email, message },
{ priority: 1 }, // 最高优先级
);
}
// 批量添加(原子操作)
async sendBulkNotification(emails: string[], content: string) {
const jobs = emails.map(email => ({
name: 'send-notification',
data: { to: email, content },
opts: { attempts: 3 },
}));
await this.emailQueue.addBulk(jobs);
}
}四、消费者(Consumer / Worker)
处理队列中的 Job:
typescript
// email/email.consumer.ts
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Job } from 'bullmq';
@Processor('email', {
concurrency: 5, // 最多同时处理 5 个 Job
})
export class EmailConsumer extends WorkerHost {
private readonly logger = new Logger(EmailConsumer.name);
constructor(private emailService: EmailService) {
super();
}
// 处理所有 Job(通过 job.name 区分)
async process(job: Job<SendEmailJobData>): Promise<void> {
this.logger.log(`处理 Job: ${job.name} #${job.id}`);
switch (job.name) {
case 'send-welcome':
await this.handleWelcome(job);
break;
case 'send-followup':
await this.handleFollowUp(job);
break;
default:
throw new Error(`未知 Job 类型: ${job.name}`);
}
}
private async handleWelcome(job: Job<SendEmailJobData>) {
const { to, subject, template, context } = job.data;
// 更新进度(对于长耗时任务)
await job.updateProgress(10);
await this.emailService.send({ to, subject, template, context });
await job.updateProgress(100);
}
private async handleFollowUp(job: Job<{ userId: number }>) {
const user = await this.usersService.findOne(job.data.userId);
if (!user) return; // 用户已删除,静默跳过
await this.emailService.send({
to: user.email,
subject: '你还记得我们吗?',
template: 'followup',
context: { name: user.name },
});
}
// 任务完成回调
@OnWorkerEvent('completed')
onCompleted(job: Job) {
this.logger.log(`Job ${job.id} 完成,耗时 ${Date.now() - job.timestamp}ms`);
}
// 任务失败回调(所有重试都失败后触发)
@OnWorkerEvent('failed')
onFailed(job: Job, error: Error) {
this.logger.error(`Job ${job.id} 最终失败: ${error.message}`, {
jobName: job.name,
data: job.data,
attempts: job.attemptsMade,
stack: error.stack,
});
// 可在此发送告警(Slack、邮件等)
}
}五、图片处理队列(完整示例)
typescript
// image/image.producer.ts
export interface ProcessImageJobData {
originalPath: string;
userId: number;
targetSizes: number[];
}
@Injectable()
export class ImageProducer {
constructor(@InjectQueue('image-process') private queue: Queue) {}
async enqueueProcess(filePath: string, userId: number) {
const job = await this.queue.add(
'resize',
{ originalPath: filePath, userId, targetSizes: [200, 400, 800] },
{
attempts: 2,
timeout: 30 * 1000, // 30 秒超时
},
);
return { jobId: job.id }; // 返回 jobId 供前端轮询状态
}
async getJobStatus(jobId: string) {
const job = await this.queue.getJob(jobId);
if (!job) throw new NotFoundException('Job 不存在');
const state = await job.getState(); // waiting/active/completed/failed
return {
id: job.id,
state,
progress: job.progress,
result: job.returnvalue,
failedReason: job.failedReason,
};
}
}typescript
// image/image.consumer.ts
@Processor('image-process', { concurrency: 2 }) // 图片处理限制并发为 2
export class ImageConsumer extends WorkerHost {
async process(job: Job<ProcessImageJobData>) {
const { originalPath, userId, targetSizes } = job.data;
const results: Record<string, string> = {};
for (let i = 0; i < targetSizes.length; i++) {
const size = targetSizes[i];
const outputPath = `${originalPath}-${size}.webp`;
await sharp(originalPath)
.resize(size, size, { fit: 'cover' })
.webp({ quality: 80 })
.toFile(outputPath);
results[`${size}w`] = outputPath;
// 更新进度
await job.updateProgress(Math.floor(((i + 1) / targetSizes.length) * 100));
}
// 更新数据库
await this.userRepo.update(userId, { avatarUrls: results });
return results; // 存储在 job.returnvalue 中
}
}六、队列监控面板(Bull Board)
bash
npm install @bull-board/api @bull-board/nestjs @bull-board/expresstypescript
// app.module.ts
import { BullBoardModule } from '@bull-board/nestjs';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
@Module({
imports: [
BullBoardModule.forRoot({
route: '/queues', // 访问路径:http://localhost:3000/queues
adapter: ExpressAdapter,
}),
BullBoardModule.forFeature({
name: 'email',
adapter: BullMQAdapter,
}),
BullBoardModule.forFeature({
name: 'image-process',
adapter: BullMQAdapter,
}),
],
})访问 http://localhost:3000/queues 可以看到队列状态、Job 列表、重试失败任务等。
七、优先级与速率限制
typescript
// 速率限制:每秒最多处理 10 个 Job
BullModule.registerQueue({
name: 'email',
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
},
})
// Worker 侧速率限制
@Processor('email', {
limiter: {
max: 10, // 每个时间窗口最多 10 个
duration: 1000, // 1000ms 时间窗口 = 10 个/秒
},
})八、测试队列
typescript
describe('EmailProducer', () => {
let producer: EmailProducer;
let mockQueue: jest.Mocked<Queue>;
beforeEach(async () => {
mockQueue = { add: jest.fn().mockResolvedValue({ id: '123' }) } as any;
const module = await Test.createTestingModule({
providers: [
EmailProducer,
{ provide: getQueueToken('email'), useValue: mockQueue },
],
}).compile();
producer = module.get(EmailProducer);
});
it('欢迎邮件入队', async () => {
await producer.sendWelcomeEmail({ id: 1, email: 'a@b.com', name: '张三' });
expect(mockQueue.add).toHaveBeenCalledWith(
'send-welcome',
expect.objectContaining({ to: 'a@b.com', template: 'welcome' }),
expect.objectContaining({ attempts: 3 }),
);
});
});可运行 Demo:
practice/07-extensions— 队列 Producer/Consumer/重试 Demo,接口:POST /queue/jobs
常见错误
| 错误 | 原因 | 解决 |
|---|---|---|
maxRetriesPerRequest must be null | BullMQ 要求 ioredis 连接设置此参数 | connection: { maxRetriesPerRequest: null } |
| Worker 不消费任务 | @Processor 模块未注册到 AppModule | 确认 Consumer 类在 providers 数组中 |
Job 卡在 active 状态 | Worker 崩溃且未 ack | BullMQ 会在 lockDuration(默认 30s)后自动重新入队 |
| 重复消费(多实例) | 多个 Worker 实例同时消费同一 Job | BullMQ 通过 Redis 锁保证原子性,此问题通常是配置问题 |
bull-board 页面空白 | BullBoardModule.forFeature 未注册队列 | 每个队列都要调用 forFeature 注册 |
| 延迟任务不执行 | Redis 连接断开后定时器丢失 | 重启 Worker 会自动从 Redis 恢复延迟任务 |