Skip to content

CQRS 与 Event Sourcing

一、CQRS 解决的问题

CQRS(Command Query Responsibility Segregation,命令查询职责分离) 将系统操作分为两类:

  • Command(命令):改变系统状态的操作(创建/更新/删除),不返回数据
  • Query(查询):读取数据的操作,不改变状态
传统模式(混合):         CQRS 模式(分离):
Service                  CommandBus    QueryBus
  ├── create()             ├── CreateUserCommand
  ├── findAll()            ├── UpdateUserCommand
  ├── update()             └── DeleteUserCommand
  └── delete()
                           QueryBus
                             ├── GetUserQuery
                             ├── ListUsersQuery
                             └── SearchUsersQuery

适用场景:

  • 读写负载差异大(读多写少)
  • 需要独立优化读/写端(读端用 Redis 缓存,写端用 PostgreSQL)
  • 复杂领域逻辑(DDD 场景)
  • 需要完整的操作审计日志

不适用场景:

  • 简单 CRUD(过度设计,代码量增加 2-3 倍)
  • 团队不熟悉 DDD 概念

二、安装与模块配置

bash
npm install @nestjs/cqrs
typescript
// users/users.module.ts
import { CqrsModule } from '@nestjs/cqrs';

const CommandHandlers = [CreateUserHandler, UpdateUserHandler, DeleteUserHandler];
const QueryHandlers = [GetUserQueryHandler, ListUsersQueryHandler];
const EventHandlers = [UserCreatedEventHandler, UserDeletedEventHandler];

@Module({
  imports: [CqrsModule, TypeOrmModule.forFeature([User])],
  controllers: [UsersController],
  providers: [
    UsersService,
    ...CommandHandlers,
    ...QueryHandlers,
    ...EventHandlers,
  ],
})
export class UsersModule {}

三、Command 实现

定义 Command

typescript
// users/commands/impl/create-user.command.ts
export class CreateUserCommand {
  constructor(
    public readonly dto: CreateUserDto,
    // 可携带请求上下文(trace ID、操作者 ID 等)
    public readonly requestedBy?: number,
  ) {}
}

export class UpdateUserCommand {
  constructor(
    public readonly id: number,
    public readonly dto: UpdateUserDto,
    public readonly requestedBy: number,
  ) {}
}

export class DeleteUserCommand {
  constructor(
    public readonly id: number,
    public readonly requestedBy: number,
  ) {}
}

Command Handler

typescript
// users/commands/handlers/create-user.handler.ts
import { CommandHandler, ICommandHandler, EventBus } from '@nestjs/cqrs';

@CommandHandler(CreateUserCommand)
export class CreateUserHandler implements ICommandHandler<CreateUserCommand> {
  constructor(
    @InjectRepository(User)
    private readonly userRepo: Repository<User>,
    private readonly eventBus: EventBus,
  ) {}

  async execute(command: CreateUserCommand): Promise<User> {
    const { dto } = command;

    // 业务验证
    const existing = await this.userRepo.findOne({ where: { email: dto.email } });
    if (existing) throw new ConflictException('邮箱已注册');

    // 执行写操作
    const user = this.userRepo.create({
      ...dto,
      password: await bcrypt.hash(dto.password, 12),
    });
    const saved = await this.userRepo.save(user);

    // 发布领域事件(其他模块可以订阅)
    this.eventBus.publish(new UserCreatedEvent(saved.id, saved.email));

    return saved;
  }
}

@CommandHandler(DeleteUserCommand)
export class DeleteUserHandler implements ICommandHandler<DeleteUserCommand> {
  async execute(command: DeleteUserCommand): Promise<void> {
    const user = await this.userRepo.findOne({ where: { id: command.id } });
    if (!user) throw new NotFoundException('用户不存在');

    await this.userRepo.softDelete(command.id);
    this.eventBus.publish(new UserDeletedEvent(command.id, command.requestedBy));
  }
}

四、Query 实现

定义 Query

typescript
// users/queries/impl/get-user.query.ts
export class GetUserQuery {
  constructor(public readonly id: number) {}
}

export class ListUsersQuery {
  constructor(
    public readonly page: number = 1,
    public readonly limit: number = 10,
    public readonly role?: string,
    public readonly keyword?: string,
  ) {}
}

Query Handler(可以有独立的读模型/缓存)

typescript
// users/queries/handlers/get-user.handler.ts
import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';

@QueryHandler(GetUserQuery)
export class GetUserQueryHandler implements IQueryHandler<GetUserQuery> {
  constructor(
    @InjectRepository(User)
    private readonly userRepo: Repository<User>,
    @Inject(CACHE_MANAGER) private readonly cache: Cache,
  ) {}

  async execute(query: GetUserQuery): Promise<User> {
    // 读操作可以独立缓存
    const cacheKey = `user:${query.id}`;
    const cached = await this.cache.get<User>(cacheKey);
    if (cached) return cached;

    const user = await this.userRepo.findOne({
      where: { id: query.id },
      relations: { posts: true },
    });
    if (!user) throw new NotFoundException('用户不存在');

    await this.cache.set(cacheKey, user, 60000);  // 缓存 60 秒
    return user;
  }
}

@QueryHandler(ListUsersQuery)
export class ListUsersQueryHandler implements IQueryHandler<ListUsersQuery> {
  async execute(query: ListUsersQuery) {
    const { page, limit, role, keyword } = query;
    const qb = this.userRepo.createQueryBuilder('user');

    if (role) qb.andWhere('user.role = :role', { role });
    if (keyword) qb.andWhere('user.name ILIKE :kw OR user.email ILIKE :kw', { kw: `%${keyword}%` });

    const [items, total] = await qb
      .orderBy('user.createdAt', 'DESC')
      .skip((page - 1) * limit)
      .take(limit)
      .getManyAndCount();

    return { items, total, page, limit, totalPages: Math.ceil(total / limit) };
  }
}

五、在 Controller/Service 中使用

typescript
// users/users.service.ts
import { CommandBus, QueryBus } from '@nestjs/cqrs';

@Injectable()
export class UsersService {
  constructor(
    private readonly commandBus: CommandBus,
    private readonly queryBus: QueryBus,
  ) {}

  // Command:改变状态
  createUser(dto: CreateUserDto, requestedBy?: number) {
    return this.commandBus.execute(new CreateUserCommand(dto, requestedBy));
  }

  updateUser(id: number, dto: UpdateUserDto, requestedBy: number) {
    return this.commandBus.execute(new UpdateUserCommand(id, dto, requestedBy));
  }

  deleteUser(id: number, requestedBy: number) {
    return this.commandBus.execute(new DeleteUserCommand(id, requestedBy));
  }

  // Query:读取数据
  getUser(id: number) {
    return this.queryBus.execute(new GetUserQuery(id));
  }

  listUsers(page: number, limit: number, role?: string) {
    return this.queryBus.execute(new ListUsersQuery(page, limit, role));
  }
}

六、领域事件(Domain Events)

typescript
// users/events/impl/user-created.event.ts
export class UserCreatedEvent {
  constructor(
    public readonly userId: number,
    public readonly email: string,
    public readonly createdAt: Date = new Date(),
  ) {}
}

// users/events/handlers/user-created.handler.ts
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';

@EventsHandler(UserCreatedEvent)
export class UserCreatedEventHandler implements IEventHandler<UserCreatedEvent> {
  constructor(
    private readonly emailService: EmailService,
    private readonly auditService: AuditService,
  ) {}

  async handle(event: UserCreatedEvent) {
    // 多个副作用,互不影响
    await Promise.allSettled([
      this.emailService.sendWelcomeEmail(event.email),
      this.auditService.log('user_created', event.userId),
    ]);
  }
}

// 订阅多个事件
@EventsHandler(UserCreatedEvent, UserUpdatedEvent)
export class UserAuditHandler implements IEventHandler<UserCreatedEvent | UserUpdatedEvent> {
  handle(event: UserCreatedEvent | UserUpdatedEvent) {
    this.auditLog.record(event);
  }
}

七、Event Sourcing(事件溯源)

Event Sourcing 是比 CQRS 更激进的模式:不存储当前状态,只存储事件序列,当前状态通过重放事件得到。

传统模式:   users 表 → { id: 1, email: '...', balance: 100 }
Event Sourcing:events 表 → [
  { type: 'UserCreated',     data: { email: '...' } },
  { type: 'BalanceDeposit',  data: { amount: 200 } },
  { type: 'BalanceWithdraw', data: { amount: 100 } },
]
// 当前 balance = 重放以上事件得出 100

聚合根(Aggregate Root)

typescript
// users/aggregates/user.aggregate.ts
import { AggregateRoot } from '@nestjs/cqrs';

export class UserAggregate extends AggregateRoot {
  private id: number;
  private email: string;
  private balance: number = 0;
  private version: number = 0;

  // 工厂方法(创建新用户)
  static create(dto: CreateUserDto): UserAggregate {
    const user = new UserAggregate();
    user.apply(new UserCreatedEvent(dto.id, dto.email));  // 发布而不是直接修改
    return user;
  }

  // 存款操作
  deposit(amount: number) {
    if (amount <= 0) throw new BadRequestException('金额必须大于 0');
    this.apply(new BalanceDepositedEvent(this.id, amount));
  }

  // 事件处理器:on<EventName>(自动调用)
  onUserCreatedEvent(event: UserCreatedEvent) {
    this.id = event.userId;
    this.email = event.email;
    this.version++;
  }

  onBalanceDepositedEvent(event: BalanceDepositedEvent) {
    this.balance += event.amount;
    this.version++;
  }
}

事件存储

typescript
// 事件存储表设计
// event_store: { id, aggregate_id, aggregate_type, event_type, payload, version, occurred_at }

@CommandHandler(CreateUserCommand)
export class CreateUserHandler {
  async execute(command: CreateUserCommand) {
    const aggregate = UserAggregate.create(command.dto);

    // 持久化所有未提交的事件
    const events = aggregate.getUncommittedEvents();
    await this.eventStore.save(events, 'User', command.dto.id);

    // 触发事件处理器(更新读模型)
    aggregate.commit();
  }
}

八、CQRS vs Event Sourcing 对比

特性CQRSEvent Sourcing
读写分离✅(通常配合使用)
完整审计日志❌(需额外实现)✅(天然支持)
时间回溯✅(重放事件)
实现复杂度
查询性能好(独立读模型)需要额外投影(Projection)
适合场景写复杂、读简单金融、审计、事件驱动系统

建议:先实施 CQRS,有明确需要时再引入 Event Sourcing。 Event Sourcing 会大幅增加系统复杂度,不应轻易使用。


可运行 Demo: practice/05-microservices — CQRS CommandHandler/QueryHandler 实现参考


常见错误

错误原因解决
CommandBus.execute() 没有找到 HandlerHandler 未注册到模块 providers在 Module 的 providers 中注册所有 CommandHandler
事件溯源重放顺序错误事件没有按 version 排序查询事件时 ORDER BY version ASC
AggregateRoot apply() 后状态不更新没有实现 on[EventName]() 方法apply(new MyEvent()) 会自动调用 onMyEvent(),方法名必须匹配
CQRS 读写不一致(最终一致性延迟)命令完成后立即查询但读模型未更新返回命令结果时不依赖读模型;或在 UI 层做乐观更新

NestJS 深度学习体系