任务调度(定时任务)
一、@nestjs/schedule 简介
@nestjs/schedule 是 NestJS 官方调度库,基于 node-cron 封装,提供声明式定时任务:
bash
npm install @nestjs/schedule
npm install -D @types/crontypescript
// app.module.ts
import { ScheduleModule } from '@nestjs/schedule';
@Module({
imports: [
ScheduleModule.forRoot(), // 启动调度器
],
})
export class AppModule {}二、三种任务类型
| 类型 | 装饰器 | 触发时机 |
|---|---|---|
| Cron 表达式 | @Cron() | 按 cron 规则定时执行 |
| 固定间隔 | @Interval() | 每隔 N 毫秒执行一次 |
| 延迟执行 | @Timeout() | 应用启动后 N 毫秒执行一次 |
三、Cron 任务
typescript
// tasks/tasks.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
@Injectable()
export class TasksService {
private readonly logger = new Logger(TasksService.name);
// 使用预定义常量
@Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT)
async cleanupExpiredTokens() {
this.logger.log('清理过期 Token...');
const count = await this.authService.cleanExpiredTokens();
this.logger.log(`已清理 ${count} 个过期 Token`);
}
// 自定义 cron 表达式:每天凌晨 2:30
@Cron('30 2 * * *', {
name: 'daily-backup', // 任务名称(用于动态控制)
timeZone: 'Asia/Shanghai', // 时区(默认 UTC)
})
async dailyBackup() {
await this.backupService.createSnapshot();
}
// 工作日(周一至周五)每小时执行
@Cron('0 * * * 1-5')
async businessHourTask() {
// ...
}
// 每月 1 日发送月报
@Cron('0 9 1 * *', { timeZone: 'Asia/Shanghai' })
async sendMonthlyReport() {
const users = await this.usersService.findAllActive();
for (const user of users) {
await this.emailService.sendMonthlyReport(user);
}
}
}Cron 表达式速查
┌─────── 秒(0-59) [可选]
│ ┌───── 分钟(0-59)
│ │ ┌─── 小时(0-23)
│ │ │ ┌─ 日(1-31)
│ │ │ │ ┌ 月(1-12)
│ │ │ │ │ ┌ 周几(0-7,0 和 7 都是周日)
│ │ │ │ │ │
* * * * * *
常用示例:
0 * * * * 每小时整点
*/15 * * * * 每 15 分钟
0 9 * * 1-5 工作日早 9 点
0 0 1 * * 每月 1 日午夜
0 0 * * 0 每周日午夜CronExpression 预定义常量
typescript
import { CronExpression } from '@nestjs/schedule';
CronExpression.EVERY_SECOND // '* * * * * *'
CronExpression.EVERY_MINUTE // '*/1 * * * *'
CronExpression.EVERY_5_MINUTES // '0 */5 * * * *'
CronExpression.EVERY_HOUR // '0 0-23/1 * * *'
CronExpression.EVERY_DAY_AT_MIDNIGHT // '0 0 * * *'
CronExpression.EVERY_DAY_AT_NOON // '0 12 * * *'
CronExpression.EVERY_WEEK // '0 0 * * 0'
CronExpression.EVERY_1ST_DAY_OF_MONTH // '0 0 1 * *'四、固定间隔与延迟任务
typescript
// 每 30 秒更新一次汇率(从应用启动起计时)
@Interval('update-rates', 30 * 1000)
async updateExchangeRates() {
const rates = await this.externalApi.fetchRates();
await this.cacheManager.set('exchange-rates', rates, 60 * 1000);
}
// 应用启动 10 秒后执行一次初始化(只执行一次)
@Timeout('init-data', 10 * 1000)
async initializeData() {
this.logger.log('执行启动初始化...');
await this.syncService.syncFromUpstream();
}五、动态任务控制
SchedulerRegistry 允许在运行时创建、暂停、删除任务:
typescript
// tasks/tasks.controller.ts
import { SchedulerRegistry } from '@nestjs/schedule';
import { CronJob } from 'cron';
@Injectable()
export class TasksManagerService {
constructor(private schedulerRegistry: SchedulerRegistry) {}
// 暂停/恢复任务
pauseTask(name: string) {
const job = this.schedulerRegistry.getCronJob(name);
job.stop();
this.logger.log(`任务 ${name} 已暂停`);
}
resumeTask(name: string) {
const job = this.schedulerRegistry.getCronJob(name);
job.start();
this.logger.log(`任务 ${name} 已恢复`);
}
// 查看所有任务状态
getAllJobs() {
const cronJobs = [...this.schedulerRegistry.getCronJobs().entries()].map(
([name, job]) => ({
name,
nextRun: job.nextDate().toJSDate(),
running: job.running,
}),
);
const intervals = [...this.schedulerRegistry.getIntervals()];
const timeouts = [...this.schedulerRegistry.getTimeouts()];
return { cronJobs, intervals, timeouts };
}
// 动态创建任务(运行时)
addDynamicCronJob(name: string, cronExpression: string, callback: () => void) {
const job = new CronJob(cronExpression, callback, null, true, 'Asia/Shanghai');
this.schedulerRegistry.addCronJob(name, job);
this.logger.log(`动态任务 ${name} 已创建`);
}
// 删除任务
deleteTask(name: string) {
this.schedulerRegistry.deleteCronJob(name);
this.logger.log(`任务 ${name} 已删除`);
}
}六、任务锁(多实例防重复执行)
多实例部署时,同一时刻多个实例都会触发 cron,导致任务重复执行:
bash
npm install redlock # 基于 Redis 的分布式锁typescript
import Redlock from 'redlock';
@Injectable()
export class SafeTasksService {
private redlock: Redlock;
constructor(
@InjectRedis() private redis: Redis,
) {
this.redlock = new Redlock([redis], {
retryCount: 0, // 获取不到锁直接放弃(不重试)
});
}
@Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT)
async dailyCleanup() {
const lockKey = 'lock:daily-cleanup';
let lock;
try {
// 尝试获取锁(10 秒超时)
lock = await this.redlock.acquire([lockKey], 10 * 1000);
this.logger.log('获取锁成功,开始执行每日清理');
await this.doCleanup();
} catch (err) {
if (err.name === 'ResourceLockedError') {
this.logger.log('其他实例已在执行,跳过');
return;
}
throw err;
} finally {
await lock?.release();
}
}
private async doCleanup() {
// 实际清理逻辑
}
}七、任务执行记录与监控
typescript
// 记录任务执行历史(方便排查问题)
@Injectable()
export class TasksService {
@Cron('0 2 * * *', { name: 'nightly-report' })
async generateNightlyReport() {
const startTime = Date.now();
let status: 'success' | 'failed' = 'success';
let errorMessage: string | null = null;
try {
await this.reportService.generate();
} catch (err) {
status = 'failed';
errorMessage = err.message;
this.logger.error('夜间报表生成失败', err.stack);
} finally {
const duration = Date.now() - startTime;
// 记录执行日志
await this.taskLogRepo.save({
taskName: 'nightly-report',
status,
duration,
errorMessage,
executedAt: new Date(),
});
this.logger.log(`夜间报表: ${status}, 耗时 ${duration}ms`);
}
}
}八、测试定时任务
typescript
describe('TasksService', () => {
let tasksService: TasksService;
beforeEach(async () => {
const module = await Test.createTestingModule({
imports: [ScheduleModule.forRoot()],
providers: [TasksService, ...mockProviders],
}).compile();
tasksService = module.get(TasksService);
});
it('cleanupExpiredTokens 应清理过期 Token', async () => {
mockAuthService.cleanExpiredTokens.mockResolvedValue(5);
// 直接调用方法(不依赖 cron 触发)
await tasksService.cleanupExpiredTokens();
expect(mockAuthService.cleanExpiredTokens).toHaveBeenCalledTimes(1);
});
});测试技巧: 不要测试 cron 何时触发(那是 node-cron 的责任),只测任务函数本身的业务逻辑。
可运行 Demo:
practice/07-extensions— Cron/Interval/Timeout Demo,接口:GET /scheduler/log
常见错误
| 错误 | 原因 | 解决 |
|---|---|---|
| Cron 任务不执行 | ScheduleModule.forRoot() 未加入 AppModule imports | 检查 imports: [ScheduleModule.forRoot()] |
| 时区错误(任务在错误时间执行) | 服务器默认 UTC,业务用北京时间 | @Cron('0 2 * * *', { timeZone: 'Asia/Shanghai' }) |
| 多实例下任务重复执行 | 每个实例各自触发 Cron | 用 Redlock 分布式锁,只让一个实例执行 |
| 任务执行时间超过触发间隔 | 上一次未完成下一次已开始 | @Cron 不会等待上次完成;用变量标记 isRunning 跳过重叠 |