Skip to content

队列与后台任务

一、为什么需要队列

某些操作不应该让用户等待:

❌ 同步处理(用户等待 5 秒):
  POST /register → 创建账号 → 发送激活邮件(2s)→ 生成推荐内容(3s)→ 返回

✅ 队列异步处理(用户立即得到响应):
  POST /register → 创建账号 → 入队两个 Job → 立即返回 201
                                ↓           ↓
                          Worker: 发邮件  Worker: 生成推荐

适合队列的场景:

  • 发送邮件/短信/推送通知
  • 图片/视频处理(压缩、转码)
  • 生成 PDF 报表
  • 第三方 API 调用(限速场景)
  • 数据同步与 ETL
  • 大批量数据库写入

二、安装配置(BullMQ)

BullMQ 是基于 Redis 的高性能队列,是 NestJS 官方推荐的队列方案:

bash
npm install @nestjs/bullmq bullmq ioredis
typescript
// app.module.ts
import { BullModule } from '@nestjs/bullmq';

@Module({
  imports: [
    BullModule.forRootAsync({
      inject: [ConfigService],
      useFactory: (config: ConfigService) => ({
        connection: {
          url: config.get('REDIS_URL', 'redis://localhost:6379'),
          maxRetriesPerRequest: null,  // BullMQ 要求
        },
      }),
    }),

    // 注册具体队列
    BullModule.registerQueue(
      { name: 'email' },         // 邮件队列
      { name: 'image-process' }, // 图片处理队列
      { name: 'notification' },  // 通知队列
    ),
  ],
})

三、生产者(Producer)

将 Job 加入队列:

typescript
// email/email.producer.ts
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';

export interface SendEmailJobData {
  to: string;
  subject: string;
  template: 'welcome' | 'reset-password' | 'monthly-report';
  context: Record<string, any>;
}

@Injectable()
export class EmailProducer {
  constructor(
    @InjectQueue('email') private emailQueue: Queue,
  ) {}

  // 加入队列(立即执行)
  async sendWelcomeEmail(user: { id: number; email: string; name: string }) {
    await this.emailQueue.add(
      'send-welcome',                     // Job 名称
      {
        to: user.email,
        subject: '欢迎加入!',
        template: 'welcome',
        context: { name: user.name },
      } as SendEmailJobData,
      {
        attempts: 3,                      // 失败重试 3 次
        backoff: { type: 'exponential', delay: 2000 },  // 指数退避
        removeOnComplete: { count: 100 }, // 成功后保留最近 100 条记录
        removeOnFail: { count: 500 },     // 失败记录保留 500 条
      },
    );
  }

  // 延迟执行(1 小时后发送)
  async scheduleFollowUpEmail(userId: number) {
    await this.emailQueue.add(
      'send-followup',
      { userId },
      { delay: 60 * 60 * 1000 },  // 1 小时后
    );
  }

  // 优先级队列(数字越小优先级越高)
  async sendUrgentAlert(email: string, message: string) {
    await this.emailQueue.add(
      'send-alert',
      { to: email, message },
      { priority: 1 },  // 最高优先级
    );
  }

  // 批量添加(原子操作)
  async sendBulkNotification(emails: string[], content: string) {
    const jobs = emails.map(email => ({
      name: 'send-notification',
      data: { to: email, content },
      opts: { attempts: 3 },
    }));
    await this.emailQueue.addBulk(jobs);
  }
}

四、消费者(Consumer / Worker)

处理队列中的 Job:

typescript
// email/email.consumer.ts
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Job } from 'bullmq';

@Processor('email', {
  concurrency: 5,   // 最多同时处理 5 个 Job
})
export class EmailConsumer extends WorkerHost {
  private readonly logger = new Logger(EmailConsumer.name);

  constructor(private emailService: EmailService) {
    super();
  }

  // 处理所有 Job(通过 job.name 区分)
  async process(job: Job<SendEmailJobData>): Promise<void> {
    this.logger.log(`处理 Job: ${job.name} #${job.id}`);

    switch (job.name) {
      case 'send-welcome':
        await this.handleWelcome(job);
        break;
      case 'send-followup':
        await this.handleFollowUp(job);
        break;
      default:
        throw new Error(`未知 Job 类型: ${job.name}`);
    }
  }

  private async handleWelcome(job: Job<SendEmailJobData>) {
    const { to, subject, template, context } = job.data;

    // 更新进度(对于长耗时任务)
    await job.updateProgress(10);

    await this.emailService.send({ to, subject, template, context });

    await job.updateProgress(100);
  }

  private async handleFollowUp(job: Job<{ userId: number }>) {
    const user = await this.usersService.findOne(job.data.userId);
    if (!user) return;  // 用户已删除,静默跳过

    await this.emailService.send({
      to: user.email,
      subject: '你还记得我们吗?',
      template: 'followup',
      context: { name: user.name },
    });
  }

  // 任务完成回调
  @OnWorkerEvent('completed')
  onCompleted(job: Job) {
    this.logger.log(`Job ${job.id} 完成,耗时 ${Date.now() - job.timestamp}ms`);
  }

  // 任务失败回调(所有重试都失败后触发)
  @OnWorkerEvent('failed')
  onFailed(job: Job, error: Error) {
    this.logger.error(`Job ${job.id} 最终失败: ${error.message}`, {
      jobName: job.name,
      data: job.data,
      attempts: job.attemptsMade,
      stack: error.stack,
    });
    // 可在此发送告警(Slack、邮件等)
  }
}

五、图片处理队列(完整示例)

typescript
// image/image.producer.ts
export interface ProcessImageJobData {
  originalPath: string;
  userId: number;
  targetSizes: number[];
}

@Injectable()
export class ImageProducer {
  constructor(@InjectQueue('image-process') private queue: Queue) {}

  async enqueueProcess(filePath: string, userId: number) {
    const job = await this.queue.add(
      'resize',
      { originalPath: filePath, userId, targetSizes: [200, 400, 800] },
      {
        attempts: 2,
        timeout: 30 * 1000,  // 30 秒超时
      },
    );
    return { jobId: job.id };  // 返回 jobId 供前端轮询状态
  }

  async getJobStatus(jobId: string) {
    const job = await this.queue.getJob(jobId);
    if (!job) throw new NotFoundException('Job 不存在');

    const state = await job.getState();  // waiting/active/completed/failed
    return {
      id: job.id,
      state,
      progress: job.progress,
      result: job.returnvalue,
      failedReason: job.failedReason,
    };
  }
}
typescript
// image/image.consumer.ts
@Processor('image-process', { concurrency: 2 })  // 图片处理限制并发为 2
export class ImageConsumer extends WorkerHost {
  async process(job: Job<ProcessImageJobData>) {
    const { originalPath, userId, targetSizes } = job.data;

    const results: Record<string, string> = {};

    for (let i = 0; i < targetSizes.length; i++) {
      const size = targetSizes[i];
      const outputPath = `${originalPath}-${size}.webp`;

      await sharp(originalPath)
        .resize(size, size, { fit: 'cover' })
        .webp({ quality: 80 })
        .toFile(outputPath);

      results[`${size}w`] = outputPath;

      // 更新进度
      await job.updateProgress(Math.floor(((i + 1) / targetSizes.length) * 100));
    }

    // 更新数据库
    await this.userRepo.update(userId, { avatarUrls: results });

    return results; // 存储在 job.returnvalue 中
  }
}

六、队列监控面板(Bull Board)

bash
npm install @bull-board/api @bull-board/nestjs @bull-board/express
typescript
// app.module.ts
import { BullBoardModule } from '@bull-board/nestjs';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';

@Module({
  imports: [
    BullBoardModule.forRoot({
      route: '/queues',           // 访问路径:http://localhost:3000/queues
      adapter: ExpressAdapter,
    }),
    BullBoardModule.forFeature({
      name: 'email',
      adapter: BullMQAdapter,
    }),
    BullBoardModule.forFeature({
      name: 'image-process',
      adapter: BullMQAdapter,
    }),
  ],
})

访问 http://localhost:3000/queues 可以看到队列状态、Job 列表、重试失败任务等。


七、优先级与速率限制

typescript
// 速率限制:每秒最多处理 10 个 Job
BullModule.registerQueue({
  name: 'email',
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'exponential', delay: 1000 },
  },
})

// Worker 侧速率限制
@Processor('email', {
  limiter: {
    max: 10,          // 每个时间窗口最多 10 个
    duration: 1000,   // 1000ms 时间窗口 = 10 个/秒
  },
})

八、测试队列

typescript
describe('EmailProducer', () => {
  let producer: EmailProducer;
  let mockQueue: jest.Mocked<Queue>;

  beforeEach(async () => {
    mockQueue = { add: jest.fn().mockResolvedValue({ id: '123' }) } as any;

    const module = await Test.createTestingModule({
      providers: [
        EmailProducer,
        { provide: getQueueToken('email'), useValue: mockQueue },
      ],
    }).compile();

    producer = module.get(EmailProducer);
  });

  it('欢迎邮件入队', async () => {
    await producer.sendWelcomeEmail({ id: 1, email: 'a@b.com', name: '张三' });

    expect(mockQueue.add).toHaveBeenCalledWith(
      'send-welcome',
      expect.objectContaining({ to: 'a@b.com', template: 'welcome' }),
      expect.objectContaining({ attempts: 3 }),
    );
  });
});

可运行 Demo: practice/07-extensions — 队列 Producer/Consumer/重试 Demo,接口:POST /queue/jobs


常见错误

错误原因解决
maxRetriesPerRequest must be nullBullMQ 要求 ioredis 连接设置此参数connection: { maxRetriesPerRequest: null }
Worker 不消费任务@Processor 模块未注册到 AppModule确认 Consumer 类在 providers 数组中
Job 卡在 active 状态Worker 崩溃且未 ackBullMQ 会在 lockDuration(默认 30s)后自动重新入队
重复消费(多实例)多个 Worker 实例同时消费同一 JobBullMQ 通过 Redis 锁保证原子性,此问题通常是配置问题
bull-board 页面空白BullBoardModule.forFeature 未注册队列每个队列都要调用 forFeature 注册
延迟任务不执行Redis 连接断开后定时器丢失重启 Worker 会自动从 Redis 恢复延迟任务

NestJS 深度学习体系