Skip to content

任务调度(定时任务)

一、@nestjs/schedule 简介

@nestjs/schedule 是 NestJS 官方调度库,基于 node-cron 封装,提供声明式定时任务:

bash
npm install @nestjs/schedule
npm install -D @types/cron
typescript
// app.module.ts
import { ScheduleModule } from '@nestjs/schedule';

@Module({
  imports: [
    ScheduleModule.forRoot(),  // 启动调度器
  ],
})
export class AppModule {}

二、三种任务类型

类型装饰器触发时机
Cron 表达式@Cron()按 cron 规则定时执行
固定间隔@Interval()每隔 N 毫秒执行一次
延迟执行@Timeout()应用启动后 N 毫秒执行一次

三、Cron 任务

typescript
// tasks/tasks.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';

@Injectable()
export class TasksService {
  private readonly logger = new Logger(TasksService.name);

  // 使用预定义常量
  @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT)
  async cleanupExpiredTokens() {
    this.logger.log('清理过期 Token...');
    const count = await this.authService.cleanExpiredTokens();
    this.logger.log(`已清理 ${count} 个过期 Token`);
  }

  // 自定义 cron 表达式:每天凌晨 2:30
  @Cron('30 2 * * *', {
    name: 'daily-backup',        // 任务名称(用于动态控制)
    timeZone: 'Asia/Shanghai',   // 时区(默认 UTC)
  })
  async dailyBackup() {
    await this.backupService.createSnapshot();
  }

  // 工作日(周一至周五)每小时执行
  @Cron('0 * * * 1-5')
  async businessHourTask() {
    // ...
  }

  // 每月 1 日发送月报
  @Cron('0 9 1 * *', { timeZone: 'Asia/Shanghai' })
  async sendMonthlyReport() {
    const users = await this.usersService.findAllActive();
    for (const user of users) {
      await this.emailService.sendMonthlyReport(user);
    }
  }
}

Cron 表达式速查

┌─────── 秒(0-59)    [可选]
│ ┌───── 分钟(0-59)
│ │ ┌─── 小时(0-23)
│ │ │ ┌─ 日(1-31)
│ │ │ │ ┌ 月(1-12)
│ │ │ │ │ ┌ 周几(0-7,0 和 7 都是周日)
│ │ │ │ │ │
* * * * * *

常用示例:
  0 * * * *       每小时整点
  */15 * * * *    每 15 分钟
  0 9 * * 1-5     工作日早 9 点
  0 0 1 * *       每月 1 日午夜
  0 0 * * 0       每周日午夜

CronExpression 预定义常量

typescript
import { CronExpression } from '@nestjs/schedule';

CronExpression.EVERY_SECOND              // '* * * * * *'
CronExpression.EVERY_MINUTE             // '*/1 * * * *'
CronExpression.EVERY_5_MINUTES          // '0 */5 * * * *'
CronExpression.EVERY_HOUR               // '0 0-23/1 * * *'
CronExpression.EVERY_DAY_AT_MIDNIGHT    // '0 0 * * *'
CronExpression.EVERY_DAY_AT_NOON        // '0 12 * * *'
CronExpression.EVERY_WEEK               // '0 0 * * 0'
CronExpression.EVERY_1ST_DAY_OF_MONTH   // '0 0 1 * *'

四、固定间隔与延迟任务

typescript
// 每 30 秒更新一次汇率(从应用启动起计时)
@Interval('update-rates', 30 * 1000)
async updateExchangeRates() {
  const rates = await this.externalApi.fetchRates();
  await this.cacheManager.set('exchange-rates', rates, 60 * 1000);
}

// 应用启动 10 秒后执行一次初始化(只执行一次)
@Timeout('init-data', 10 * 1000)
async initializeData() {
  this.logger.log('执行启动初始化...');
  await this.syncService.syncFromUpstream();
}

五、动态任务控制

SchedulerRegistry 允许在运行时创建、暂停、删除任务:

typescript
// tasks/tasks.controller.ts
import { SchedulerRegistry } from '@nestjs/schedule';
import { CronJob } from 'cron';

@Injectable()
export class TasksManagerService {
  constructor(private schedulerRegistry: SchedulerRegistry) {}

  // 暂停/恢复任务
  pauseTask(name: string) {
    const job = this.schedulerRegistry.getCronJob(name);
    job.stop();
    this.logger.log(`任务 ${name} 已暂停`);
  }

  resumeTask(name: string) {
    const job = this.schedulerRegistry.getCronJob(name);
    job.start();
    this.logger.log(`任务 ${name} 已恢复`);
  }

  // 查看所有任务状态
  getAllJobs() {
    const cronJobs = [...this.schedulerRegistry.getCronJobs().entries()].map(
      ([name, job]) => ({
        name,
        nextRun: job.nextDate().toJSDate(),
        running: job.running,
      }),
    );
    const intervals = [...this.schedulerRegistry.getIntervals()];
    const timeouts = [...this.schedulerRegistry.getTimeouts()];
    return { cronJobs, intervals, timeouts };
  }

  // 动态创建任务(运行时)
  addDynamicCronJob(name: string, cronExpression: string, callback: () => void) {
    const job = new CronJob(cronExpression, callback, null, true, 'Asia/Shanghai');
    this.schedulerRegistry.addCronJob(name, job);
    this.logger.log(`动态任务 ${name} 已创建`);
  }

  // 删除任务
  deleteTask(name: string) {
    this.schedulerRegistry.deleteCronJob(name);
    this.logger.log(`任务 ${name} 已删除`);
  }
}

六、任务锁(多实例防重复执行)

多实例部署时,同一时刻多个实例都会触发 cron,导致任务重复执行:

bash
npm install redlock   # 基于 Redis 的分布式锁
typescript
import Redlock from 'redlock';

@Injectable()
export class SafeTasksService {
  private redlock: Redlock;

  constructor(
    @InjectRedis() private redis: Redis,
  ) {
    this.redlock = new Redlock([redis], {
      retryCount: 0,  // 获取不到锁直接放弃(不重试)
    });
  }

  @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT)
  async dailyCleanup() {
    const lockKey = 'lock:daily-cleanup';
    let lock;

    try {
      // 尝试获取锁(10 秒超时)
      lock = await this.redlock.acquire([lockKey], 10 * 1000);
      this.logger.log('获取锁成功,开始执行每日清理');
      await this.doCleanup();
    } catch (err) {
      if (err.name === 'ResourceLockedError') {
        this.logger.log('其他实例已在执行,跳过');
        return;
      }
      throw err;
    } finally {
      await lock?.release();
    }
  }

  private async doCleanup() {
    // 实际清理逻辑
  }
}

七、任务执行记录与监控

typescript
// 记录任务执行历史(方便排查问题)
@Injectable()
export class TasksService {
  @Cron('0 2 * * *', { name: 'nightly-report' })
  async generateNightlyReport() {
    const startTime = Date.now();
    let status: 'success' | 'failed' = 'success';
    let errorMessage: string | null = null;

    try {
      await this.reportService.generate();
    } catch (err) {
      status = 'failed';
      errorMessage = err.message;
      this.logger.error('夜间报表生成失败', err.stack);
    } finally {
      const duration = Date.now() - startTime;

      // 记录执行日志
      await this.taskLogRepo.save({
        taskName: 'nightly-report',
        status,
        duration,
        errorMessage,
        executedAt: new Date(),
      });

      this.logger.log(`夜间报表: ${status}, 耗时 ${duration}ms`);
    }
  }
}

八、测试定时任务

typescript
describe('TasksService', () => {
  let tasksService: TasksService;

  beforeEach(async () => {
    const module = await Test.createTestingModule({
      imports: [ScheduleModule.forRoot()],
      providers: [TasksService, ...mockProviders],
    }).compile();

    tasksService = module.get(TasksService);
  });

  it('cleanupExpiredTokens 应清理过期 Token', async () => {
    mockAuthService.cleanExpiredTokens.mockResolvedValue(5);

    // 直接调用方法(不依赖 cron 触发)
    await tasksService.cleanupExpiredTokens();

    expect(mockAuthService.cleanExpiredTokens).toHaveBeenCalledTimes(1);
  });
});

测试技巧: 不要测试 cron 何时触发(那是 node-cron 的责任),只测任务函数本身的业务逻辑。


可运行 Demo: practice/07-extensions — Cron/Interval/Timeout Demo,接口:GET /scheduler/log


常见错误

错误原因解决
Cron 任务不执行ScheduleModule.forRoot() 未加入 AppModule imports检查 imports: [ScheduleModule.forRoot()]
时区错误(任务在错误时间执行)服务器默认 UTC,业务用北京时间@Cron('0 2 * * *', { timeZone: 'Asia/Shanghai' })
多实例下任务重复执行每个实例各自触发 Cron用 Redlock 分布式锁,只让一个实例执行
任务执行时间超过触发间隔上一次未完成下一次已开始@Cron 不会等待上次完成;用变量标记 isRunning 跳过重叠

NestJS 深度学习体系