事务处理
一、为什么需要事务
当多个数据库操作必须作为一个原子单元执行时,需要事务。
典型场景:
- 转账:A 账户扣款 + B 账户加款,必须同时成功或同时失败
- 注册:创建用户 + 发送激活邮件记录,失败时两者都回滚
- 订单:减库存 + 创建订单 + 扣积分,必须原子执行
ACID 属性:
| 属性 | 含义 |
|---|---|
| A(原子性) | 所有操作要么全部成功,要么全部回滚 |
| C(一致性) | 事务前后数据库保持一致状态 |
| I(隔离性) | 并发事务互不干扰 |
| D(持久性) | 提交后的数据永久保存 |
二、TypeORM 事务
方式一:DataSource.transaction()(推荐)
简洁,自动处理提交和回滚:
typescript
@Injectable()
export class TransferService {
constructor(private dataSource: DataSource) {}
async transfer(fromId: number, toId: number, amount: number): Promise<void> {
await this.dataSource.transaction(async (manager: EntityManager) => {
// 在事务内使用 manager,而不是 repo
const sender = await manager.findOne(Account, {
where: { id: fromId },
lock: { mode: 'pessimistic_write' }, // 悲观锁,防止并发冲突
});
if (!sender) throw new NotFoundException('发送方账户不存在');
if (sender.balance < amount) throw new BadRequestException('余额不足');
await manager.decrement(Account, { id: fromId }, 'balance', amount);
await manager.increment(Account, { id: toId }, 'balance', amount);
// 记录转账日志
await manager.save(TransferLog, {
fromId, toId, amount,
createdAt: new Date(),
});
});
// 无异常:自动 COMMIT
// 有异常:自动 ROLLBACK,异常继续向上传播
}
}方式二:QueryRunner(手动控制,最灵活)
适合需要分步提交、保存点(Savepoint)等高级场景:
typescript
async createPostWithTags(dto: CreatePostDto, tagNames: string[]): Promise<Post> {
const queryRunner = this.dataSource.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction('READ COMMITTED'); // 指定隔离级别
try {
// 创建文章
const post = queryRunner.manager.create(Post, {
title: dto.title,
content: dto.content,
authorId: dto.authorId,
});
await queryRunner.manager.save(post);
// 创建或查找标签
const tags: Tag[] = [];
for (const name of tagNames) {
let tag = await queryRunner.manager.findOne(Tag, { where: { name } });
if (!tag) {
tag = queryRunner.manager.create(Tag, { name });
await queryRunner.manager.save(tag);
}
tags.push(tag);
}
// 关联文章和标签
post.tags = tags;
await queryRunner.manager.save(post);
await queryRunner.commitTransaction();
return post;
} catch (err) {
await queryRunner.rollbackTransaction();
throw err; // 重新抛出,让上层感知到错误
} finally {
// ⚠️ 必须释放!否则连接池泄漏
await queryRunner.release();
}
}方式三:在 Repository 中使用事务
typescript
@Injectable()
export class OrderService {
constructor(
private dataSource: DataSource,
@InjectRepository(Order) private orderRepo: Repository<Order>,
@InjectRepository(Product) private productRepo: Repository<Product>,
) {}
async createOrder(dto: CreateOrderDto): Promise<Order> {
return this.dataSource.transaction(async manager => {
// 在事务内创建特定 Repository
const orderRepo = manager.getRepository(Order);
const productRepo = manager.getRepository(Product);
// 检查库存
const product = await productRepo.findOne({
where: { id: dto.productId },
lock: { mode: 'pessimistic_write' },
});
if (!product || product.stock < dto.quantity) {
throw new BadRequestException('库存不足');
}
// 减库存
await productRepo.decrement(
{ id: dto.productId },
'stock',
dto.quantity,
);
// 创建订单
const order = orderRepo.create(dto);
return orderRepo.save(order);
});
}
}事务隔离级别
typescript
// TypeORM 支持四种标准隔离级别
await this.dataSource.transaction('READ UNCOMMITTED', async manager => { ... });
await this.dataSource.transaction('READ COMMITTED', async manager => { ... }); // 默认(PostgreSQL)
await this.dataSource.transaction('REPEATABLE READ', async manager => { ... });
await this.dataSource.transaction('SERIALIZABLE', async manager => { ... });
// QueryRunner 方式
await queryRunner.startTransaction('SERIALIZABLE');三、Prisma 事务
方式一:$transaction 数组(批量原子操作)
操作之间无依赖、无条件判断时使用:
typescript
// 原子性地创建用户和初始化其个人资料
const [user, profile] = await this.prisma.$transaction([
this.prisma.user.create({ data: userDto }),
this.prisma.profile.create({ data: { userId: -1, ...profileDto } }),
// ⚠️ 数组形式:操作并行构建,无法引用前面操作的结果
]);方式二:交互式事务(推荐)
可以使用前面步骤的结果,支持条件判断:
typescript
async transfer(fromId: number, toId: number, amount: number) {
await this.prisma.$transaction(async tx => {
// 读取发送方余额(带锁)
const sender = await tx.account.findUnique({
where: { id: fromId },
});
if (!sender || sender.balance < amount) {
throw new BadRequestException('余额不足');
}
// 原子更新两个账户
await tx.account.update({
where: { id: fromId },
data: { balance: { decrement: amount } },
});
await tx.account.update({
where: { id: toId },
data: { balance: { increment: amount } },
});
// 记录日志(使用前面步骤的数据)
await tx.transferLog.create({
data: { fromId, toId, amount, balance: sender.balance - amount },
});
}, {
maxWait: 5000, // 最长等待获取事务的时间(ms)
timeout: 10000, // 事务最长执行时间(ms)
isolationLevel: Prisma.TransactionIsolationLevel.ReadCommitted,
});
}方式三:$transaction 与批量操作
typescript
// 批量操作的最佳实践:使用 createMany
async bulkCreatePosts(posts: CreatePostDto[]) {
return this.prisma.post.createMany({
data: posts,
skipDuplicates: true, // 跳过重复数据(而不是报错)
});
}
// 复杂批量:在事务中执行多步批量操作
async bulkTransfer(transfers: { fromId: number; toId: number; amount: number }[]) {
await this.prisma.$transaction(async tx => {
for (const { fromId, toId, amount } of transfers) {
const sender = await tx.account.findUnique({ where: { id: fromId } });
if (!sender || sender.balance < amount) {
throw new Error(`账户 ${fromId} 余额不足`);
}
await tx.account.update({ where: { id: fromId }, data: { balance: { decrement: amount } } });
await tx.account.update({ where: { id: toId }, data: { balance: { increment: amount } } });
}
});
}四、分布式事务与 Saga 模式
微服务架构中,跨服务的操作无法用数据库事务保证原子性,需要 Saga 模式:
typescript
// 编排式 Saga:由协调者(Orchestrator)驱动各服务执行
@Injectable()
export class OrderSaga {
async executeCreateOrder(dto: CreateOrderDto) {
let orderId: number;
let paymentId: number;
try {
// Step 1: 创建订单
orderId = await this.orderService.create(dto);
// Step 2: 扣减库存
await this.inventoryService.deduct(dto.productId, dto.quantity);
// Step 3: 处理支付
paymentId = await this.paymentService.charge(dto.userId, dto.amount);
} catch (err) {
// 补偿事务(逆序回滚)
if (paymentId) await this.paymentService.refund(paymentId);
if (orderId) await this.orderService.cancel(orderId);
throw new InternalServerErrorException('订单创建失败,已回滚');
}
}
}五、乐观锁 vs 悲观锁
悲观锁(Pessimistic Lock)
typescript
// TypeORM:查询时锁行,阻止其他事务并发修改
const account = await manager.findOne(Account, {
where: { id },
lock: { mode: 'pessimistic_write' }, // SELECT ... FOR UPDATE
});
// Prisma:通过原生 SQL 实现
const account = await tx.$queryRaw`
SELECT * FROM accounts WHERE id = ${id} FOR UPDATE
`;乐观锁(Optimistic Lock)
typescript
// TypeORM:version 字段自动管理
@Entity()
export class Account {
@VersionColumn() // 每次 save 自动 +1,冲突时抛出 OptimisticLockVersionMismatchError
version: number;
}
try {
await manager.save(account);
} catch (err) {
if (err instanceof OptimisticLockVersionMismatchError) {
throw new ConflictException('数据已被其他用户修改,请刷新后重试');
}
throw err;
}六、事务最佳实践
- 事务粒度要小:事务持有锁的时间越短越好,减少并发冲突
- 不要在事务中做耗时操作:HTTP 请求、文件 IO、发送邮件等应放在事务外
- 始终 release QueryRunner:放在
finally块中,防止连接泄漏 - Prisma 交互式事务设置超时:防止长事务拖垮数据库
- 单元测试中 mock 事务:测试业务逻辑时无需真实事务
typescript
// 测试中 mock DataSource.transaction
jest.spyOn(dataSource, 'transaction').mockImplementation(async (fn) => {
return fn(mockManager);
});可运行 Demo:
practice/03-crud-app/typeorm-version— TypeORM DataSource.transaction 事务示例
常见错误
| 错误 | 原因 | 解决 |
|---|---|---|
| 事务中注入的 Repository 不在事务内 | DI 注入的 repo 不属于当前事务 session | 在事务回调中用 manager.getRepository(Entity) 获取事务内的 repo |
| 事务未提交 / 死锁 | 异常未正确处理导致事务挂起 | 用 try/catch/finally 或让 dataSource.transaction() 自动管理提交/回滚 |
SERIALIZATION_FAILURE 错误 | 并发写入时事务冲突(Serializable 隔离级别) | 捕获后重试,或降低隔离级别到 READ COMMITTED |
| Prisma 嵌套事务报错 | Prisma 不支持原生嵌套事务 | 用 $transaction 数组形式传递多个操作,或用 Savepoint |