CQRS 与 Event Sourcing
一、CQRS 解决的问题
CQRS(Command Query Responsibility Segregation,命令查询职责分离) 将系统操作分为两类:
- Command(命令):改变系统状态的操作(创建/更新/删除),不返回数据
- Query(查询):读取数据的操作,不改变状态
传统模式(混合): CQRS 模式(分离):
Service CommandBus QueryBus
├── create() ├── CreateUserCommand
├── findAll() ├── UpdateUserCommand
├── update() └── DeleteUserCommand
└── delete()
QueryBus
├── GetUserQuery
├── ListUsersQuery
└── SearchUsersQuery适用场景:
- 读写负载差异大(读多写少)
- 需要独立优化读/写端(读端用 Redis 缓存,写端用 PostgreSQL)
- 复杂领域逻辑(DDD 场景)
- 需要完整的操作审计日志
不适用场景:
- 简单 CRUD(过度设计,代码量增加 2-3 倍)
- 团队不熟悉 DDD 概念
二、安装与模块配置
bash
npm install @nestjs/cqrstypescript
// users/users.module.ts
import { CqrsModule } from '@nestjs/cqrs';
const CommandHandlers = [CreateUserHandler, UpdateUserHandler, DeleteUserHandler];
const QueryHandlers = [GetUserQueryHandler, ListUsersQueryHandler];
const EventHandlers = [UserCreatedEventHandler, UserDeletedEventHandler];
@Module({
imports: [CqrsModule, TypeOrmModule.forFeature([User])],
controllers: [UsersController],
providers: [
UsersService,
...CommandHandlers,
...QueryHandlers,
...EventHandlers,
],
})
export class UsersModule {}三、Command 实现
定义 Command
typescript
// users/commands/impl/create-user.command.ts
export class CreateUserCommand {
constructor(
public readonly dto: CreateUserDto,
// 可携带请求上下文(trace ID、操作者 ID 等)
public readonly requestedBy?: number,
) {}
}
export class UpdateUserCommand {
constructor(
public readonly id: number,
public readonly dto: UpdateUserDto,
public readonly requestedBy: number,
) {}
}
export class DeleteUserCommand {
constructor(
public readonly id: number,
public readonly requestedBy: number,
) {}
}Command Handler
typescript
// users/commands/handlers/create-user.handler.ts
import { CommandHandler, ICommandHandler, EventBus } from '@nestjs/cqrs';
@CommandHandler(CreateUserCommand)
export class CreateUserHandler implements ICommandHandler<CreateUserCommand> {
constructor(
@InjectRepository(User)
private readonly userRepo: Repository<User>,
private readonly eventBus: EventBus,
) {}
async execute(command: CreateUserCommand): Promise<User> {
const { dto } = command;
// 业务验证
const existing = await this.userRepo.findOne({ where: { email: dto.email } });
if (existing) throw new ConflictException('邮箱已注册');
// 执行写操作
const user = this.userRepo.create({
...dto,
password: await bcrypt.hash(dto.password, 12),
});
const saved = await this.userRepo.save(user);
// 发布领域事件(其他模块可以订阅)
this.eventBus.publish(new UserCreatedEvent(saved.id, saved.email));
return saved;
}
}
@CommandHandler(DeleteUserCommand)
export class DeleteUserHandler implements ICommandHandler<DeleteUserCommand> {
async execute(command: DeleteUserCommand): Promise<void> {
const user = await this.userRepo.findOne({ where: { id: command.id } });
if (!user) throw new NotFoundException('用户不存在');
await this.userRepo.softDelete(command.id);
this.eventBus.publish(new UserDeletedEvent(command.id, command.requestedBy));
}
}四、Query 实现
定义 Query
typescript
// users/queries/impl/get-user.query.ts
export class GetUserQuery {
constructor(public readonly id: number) {}
}
export class ListUsersQuery {
constructor(
public readonly page: number = 1,
public readonly limit: number = 10,
public readonly role?: string,
public readonly keyword?: string,
) {}
}Query Handler(可以有独立的读模型/缓存)
typescript
// users/queries/handlers/get-user.handler.ts
import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
@QueryHandler(GetUserQuery)
export class GetUserQueryHandler implements IQueryHandler<GetUserQuery> {
constructor(
@InjectRepository(User)
private readonly userRepo: Repository<User>,
@Inject(CACHE_MANAGER) private readonly cache: Cache,
) {}
async execute(query: GetUserQuery): Promise<User> {
// 读操作可以独立缓存
const cacheKey = `user:${query.id}`;
const cached = await this.cache.get<User>(cacheKey);
if (cached) return cached;
const user = await this.userRepo.findOne({
where: { id: query.id },
relations: { posts: true },
});
if (!user) throw new NotFoundException('用户不存在');
await this.cache.set(cacheKey, user, 60000); // 缓存 60 秒
return user;
}
}
@QueryHandler(ListUsersQuery)
export class ListUsersQueryHandler implements IQueryHandler<ListUsersQuery> {
async execute(query: ListUsersQuery) {
const { page, limit, role, keyword } = query;
const qb = this.userRepo.createQueryBuilder('user');
if (role) qb.andWhere('user.role = :role', { role });
if (keyword) qb.andWhere('user.name ILIKE :kw OR user.email ILIKE :kw', { kw: `%${keyword}%` });
const [items, total] = await qb
.orderBy('user.createdAt', 'DESC')
.skip((page - 1) * limit)
.take(limit)
.getManyAndCount();
return { items, total, page, limit, totalPages: Math.ceil(total / limit) };
}
}五、在 Controller/Service 中使用
typescript
// users/users.service.ts
import { CommandBus, QueryBus } from '@nestjs/cqrs';
@Injectable()
export class UsersService {
constructor(
private readonly commandBus: CommandBus,
private readonly queryBus: QueryBus,
) {}
// Command:改变状态
createUser(dto: CreateUserDto, requestedBy?: number) {
return this.commandBus.execute(new CreateUserCommand(dto, requestedBy));
}
updateUser(id: number, dto: UpdateUserDto, requestedBy: number) {
return this.commandBus.execute(new UpdateUserCommand(id, dto, requestedBy));
}
deleteUser(id: number, requestedBy: number) {
return this.commandBus.execute(new DeleteUserCommand(id, requestedBy));
}
// Query:读取数据
getUser(id: number) {
return this.queryBus.execute(new GetUserQuery(id));
}
listUsers(page: number, limit: number, role?: string) {
return this.queryBus.execute(new ListUsersQuery(page, limit, role));
}
}六、领域事件(Domain Events)
typescript
// users/events/impl/user-created.event.ts
export class UserCreatedEvent {
constructor(
public readonly userId: number,
public readonly email: string,
public readonly createdAt: Date = new Date(),
) {}
}
// users/events/handlers/user-created.handler.ts
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
@EventsHandler(UserCreatedEvent)
export class UserCreatedEventHandler implements IEventHandler<UserCreatedEvent> {
constructor(
private readonly emailService: EmailService,
private readonly auditService: AuditService,
) {}
async handle(event: UserCreatedEvent) {
// 多个副作用,互不影响
await Promise.allSettled([
this.emailService.sendWelcomeEmail(event.email),
this.auditService.log('user_created', event.userId),
]);
}
}
// 订阅多个事件
@EventsHandler(UserCreatedEvent, UserUpdatedEvent)
export class UserAuditHandler implements IEventHandler<UserCreatedEvent | UserUpdatedEvent> {
handle(event: UserCreatedEvent | UserUpdatedEvent) {
this.auditLog.record(event);
}
}七、Event Sourcing(事件溯源)
Event Sourcing 是比 CQRS 更激进的模式:不存储当前状态,只存储事件序列,当前状态通过重放事件得到。
传统模式: users 表 → { id: 1, email: '...', balance: 100 }
Event Sourcing:events 表 → [
{ type: 'UserCreated', data: { email: '...' } },
{ type: 'BalanceDeposit', data: { amount: 200 } },
{ type: 'BalanceWithdraw', data: { amount: 100 } },
]
// 当前 balance = 重放以上事件得出 100聚合根(Aggregate Root)
typescript
// users/aggregates/user.aggregate.ts
import { AggregateRoot } from '@nestjs/cqrs';
export class UserAggregate extends AggregateRoot {
private id: number;
private email: string;
private balance: number = 0;
private version: number = 0;
// 工厂方法(创建新用户)
static create(dto: CreateUserDto): UserAggregate {
const user = new UserAggregate();
user.apply(new UserCreatedEvent(dto.id, dto.email)); // 发布而不是直接修改
return user;
}
// 存款操作
deposit(amount: number) {
if (amount <= 0) throw new BadRequestException('金额必须大于 0');
this.apply(new BalanceDepositedEvent(this.id, amount));
}
// 事件处理器:on<EventName>(自动调用)
onUserCreatedEvent(event: UserCreatedEvent) {
this.id = event.userId;
this.email = event.email;
this.version++;
}
onBalanceDepositedEvent(event: BalanceDepositedEvent) {
this.balance += event.amount;
this.version++;
}
}事件存储
typescript
// 事件存储表设计
// event_store: { id, aggregate_id, aggregate_type, event_type, payload, version, occurred_at }
@CommandHandler(CreateUserCommand)
export class CreateUserHandler {
async execute(command: CreateUserCommand) {
const aggregate = UserAggregate.create(command.dto);
// 持久化所有未提交的事件
const events = aggregate.getUncommittedEvents();
await this.eventStore.save(events, 'User', command.dto.id);
// 触发事件处理器(更新读模型)
aggregate.commit();
}
}八、CQRS vs Event Sourcing 对比
| 特性 | CQRS | Event Sourcing |
|---|---|---|
| 读写分离 | ✅ | ✅(通常配合使用) |
| 完整审计日志 | ❌(需额外实现) | ✅(天然支持) |
| 时间回溯 | ❌ | ✅(重放事件) |
| 实现复杂度 | 中 | 高 |
| 查询性能 | 好(独立读模型) | 需要额外投影(Projection) |
| 适合场景 | 写复杂、读简单 | 金融、审计、事件驱动系统 |
建议:先实施 CQRS,有明确需要时再引入 Event Sourcing。 Event Sourcing 会大幅增加系统复杂度,不应轻易使用。
可运行 Demo:
practice/05-microservices— CQRS CommandHandler/QueryHandler 实现参考
常见错误
| 错误 | 原因 | 解决 |
|---|---|---|
CommandBus.execute() 没有找到 Handler | Handler 未注册到模块 providers | 在 Module 的 providers 中注册所有 CommandHandler |
| 事件溯源重放顺序错误 | 事件没有按 version 排序 | 查询事件时 ORDER BY version ASC |
AggregateRoot apply() 后状态不更新 | 没有实现 on[EventName]() 方法 | apply(new MyEvent()) 会自动调用 onMyEvent(),方法名必须匹配 |
| CQRS 读写不一致(最终一致性延迟) | 命令完成后立即查询但读模型未更新 | 返回命令结果时不依赖读模型;或在 UI 层做乐观更新 |