Skip to content

事务处理

一、为什么需要事务

当多个数据库操作必须作为一个原子单元执行时,需要事务。

典型场景:

  • 转账:A 账户扣款 + B 账户加款,必须同时成功或同时失败
  • 注册:创建用户 + 发送激活邮件记录,失败时两者都回滚
  • 订单:减库存 + 创建订单 + 扣积分,必须原子执行

ACID 属性:

属性含义
A(原子性)所有操作要么全部成功,要么全部回滚
C(一致性)事务前后数据库保持一致状态
I(隔离性)并发事务互不干扰
D(持久性)提交后的数据永久保存

二、TypeORM 事务

方式一:DataSource.transaction()(推荐)

简洁,自动处理提交和回滚:

typescript
@Injectable()
export class TransferService {
  constructor(private dataSource: DataSource) {}

  async transfer(fromId: number, toId: number, amount: number): Promise<void> {
    await this.dataSource.transaction(async (manager: EntityManager) => {
      // 在事务内使用 manager,而不是 repo
      const sender = await manager.findOne(Account, {
        where: { id: fromId },
        lock: { mode: 'pessimistic_write' },  // 悲观锁,防止并发冲突
      });

      if (!sender) throw new NotFoundException('发送方账户不存在');
      if (sender.balance < amount) throw new BadRequestException('余额不足');

      await manager.decrement(Account, { id: fromId }, 'balance', amount);
      await manager.increment(Account, { id: toId }, 'balance', amount);

      // 记录转账日志
      await manager.save(TransferLog, {
        fromId, toId, amount,
        createdAt: new Date(),
      });
    });
    // 无异常:自动 COMMIT
    // 有异常:自动 ROLLBACK,异常继续向上传播
  }
}

方式二:QueryRunner(手动控制,最灵活)

适合需要分步提交、保存点(Savepoint)等高级场景:

typescript
async createPostWithTags(dto: CreatePostDto, tagNames: string[]): Promise<Post> {
  const queryRunner = this.dataSource.createQueryRunner();

  await queryRunner.connect();
  await queryRunner.startTransaction('READ COMMITTED');  // 指定隔离级别

  try {
    // 创建文章
    const post = queryRunner.manager.create(Post, {
      title: dto.title,
      content: dto.content,
      authorId: dto.authorId,
    });
    await queryRunner.manager.save(post);

    // 创建或查找标签
    const tags: Tag[] = [];
    for (const name of tagNames) {
      let tag = await queryRunner.manager.findOne(Tag, { where: { name } });
      if (!tag) {
        tag = queryRunner.manager.create(Tag, { name });
        await queryRunner.manager.save(tag);
      }
      tags.push(tag);
    }

    // 关联文章和标签
    post.tags = tags;
    await queryRunner.manager.save(post);

    await queryRunner.commitTransaction();
    return post;
  } catch (err) {
    await queryRunner.rollbackTransaction();
    throw err;  // 重新抛出,让上层感知到错误
  } finally {
    // ⚠️ 必须释放!否则连接池泄漏
    await queryRunner.release();
  }
}

方式三:在 Repository 中使用事务

typescript
@Injectable()
export class OrderService {
  constructor(
    private dataSource: DataSource,
    @InjectRepository(Order) private orderRepo: Repository<Order>,
    @InjectRepository(Product) private productRepo: Repository<Product>,
  ) {}

  async createOrder(dto: CreateOrderDto): Promise<Order> {
    return this.dataSource.transaction(async manager => {
      // 在事务内创建特定 Repository
      const orderRepo = manager.getRepository(Order);
      const productRepo = manager.getRepository(Product);

      // 检查库存
      const product = await productRepo.findOne({
        where: { id: dto.productId },
        lock: { mode: 'pessimistic_write' },
      });
      if (!product || product.stock < dto.quantity) {
        throw new BadRequestException('库存不足');
      }

      // 减库存
      await productRepo.decrement(
        { id: dto.productId },
        'stock',
        dto.quantity,
      );

      // 创建订单
      const order = orderRepo.create(dto);
      return orderRepo.save(order);
    });
  }
}

事务隔离级别

typescript
// TypeORM 支持四种标准隔离级别
await this.dataSource.transaction('READ UNCOMMITTED', async manager => { ... });
await this.dataSource.transaction('READ COMMITTED', async manager => { ... });   // 默认(PostgreSQL)
await this.dataSource.transaction('REPEATABLE READ', async manager => { ... });
await this.dataSource.transaction('SERIALIZABLE', async manager => { ... });

// QueryRunner 方式
await queryRunner.startTransaction('SERIALIZABLE');

三、Prisma 事务

方式一:$transaction 数组(批量原子操作)

操作之间无依赖、无条件判断时使用:

typescript
// 原子性地创建用户和初始化其个人资料
const [user, profile] = await this.prisma.$transaction([
  this.prisma.user.create({ data: userDto }),
  this.prisma.profile.create({ data: { userId: -1, ...profileDto } }),
  // ⚠️ 数组形式:操作并行构建,无法引用前面操作的结果
]);

方式二:交互式事务(推荐)

可以使用前面步骤的结果,支持条件判断:

typescript
async transfer(fromId: number, toId: number, amount: number) {
  await this.prisma.$transaction(async tx => {
    // 读取发送方余额(带锁)
    const sender = await tx.account.findUnique({
      where: { id: fromId },
    });

    if (!sender || sender.balance < amount) {
      throw new BadRequestException('余额不足');
    }

    // 原子更新两个账户
    await tx.account.update({
      where: { id: fromId },
      data: { balance: { decrement: amount } },
    });
    await tx.account.update({
      where: { id: toId },
      data: { balance: { increment: amount } },
    });

    // 记录日志(使用前面步骤的数据)
    await tx.transferLog.create({
      data: { fromId, toId, amount, balance: sender.balance - amount },
    });
  }, {
    maxWait: 5000,   // 最长等待获取事务的时间(ms)
    timeout: 10000,  // 事务最长执行时间(ms)
    isolationLevel: Prisma.TransactionIsolationLevel.ReadCommitted,
  });
}

方式三:$transaction 与批量操作

typescript
// 批量操作的最佳实践:使用 createMany
async bulkCreatePosts(posts: CreatePostDto[]) {
  return this.prisma.post.createMany({
    data: posts,
    skipDuplicates: true,  // 跳过重复数据(而不是报错)
  });
}

// 复杂批量:在事务中执行多步批量操作
async bulkTransfer(transfers: { fromId: number; toId: number; amount: number }[]) {
  await this.prisma.$transaction(async tx => {
    for (const { fromId, toId, amount } of transfers) {
      const sender = await tx.account.findUnique({ where: { id: fromId } });
      if (!sender || sender.balance < amount) {
        throw new Error(`账户 ${fromId} 余额不足`);
      }
      await tx.account.update({ where: { id: fromId }, data: { balance: { decrement: amount } } });
      await tx.account.update({ where: { id: toId }, data: { balance: { increment: amount } } });
    }
  });
}

四、分布式事务与 Saga 模式

微服务架构中,跨服务的操作无法用数据库事务保证原子性,需要 Saga 模式

typescript
// 编排式 Saga:由协调者(Orchestrator)驱动各服务执行
@Injectable()
export class OrderSaga {
  async executeCreateOrder(dto: CreateOrderDto) {
    let orderId: number;
    let paymentId: number;

    try {
      // Step 1: 创建订单
      orderId = await this.orderService.create(dto);

      // Step 2: 扣减库存
      await this.inventoryService.deduct(dto.productId, dto.quantity);

      // Step 3: 处理支付
      paymentId = await this.paymentService.charge(dto.userId, dto.amount);

    } catch (err) {
      // 补偿事务(逆序回滚)
      if (paymentId) await this.paymentService.refund(paymentId);
      if (orderId) await this.orderService.cancel(orderId);
      throw new InternalServerErrorException('订单创建失败,已回滚');
    }
  }
}

五、乐观锁 vs 悲观锁

悲观锁(Pessimistic Lock)

typescript
// TypeORM:查询时锁行,阻止其他事务并发修改
const account = await manager.findOne(Account, {
  where: { id },
  lock: { mode: 'pessimistic_write' },  // SELECT ... FOR UPDATE
});

// Prisma:通过原生 SQL 实现
const account = await tx.$queryRaw`
  SELECT * FROM accounts WHERE id = ${id} FOR UPDATE
`;

乐观锁(Optimistic Lock)

typescript
// TypeORM:version 字段自动管理
@Entity()
export class Account {
  @VersionColumn()  // 每次 save 自动 +1,冲突时抛出 OptimisticLockVersionMismatchError
  version: number;
}

try {
  await manager.save(account);
} catch (err) {
  if (err instanceof OptimisticLockVersionMismatchError) {
    throw new ConflictException('数据已被其他用户修改,请刷新后重试');
  }
  throw err;
}

六、事务最佳实践

  1. 事务粒度要小:事务持有锁的时间越短越好,减少并发冲突
  2. 不要在事务中做耗时操作:HTTP 请求、文件 IO、发送邮件等应放在事务外
  3. 始终 release QueryRunner:放在 finally 块中,防止连接泄漏
  4. Prisma 交互式事务设置超时:防止长事务拖垮数据库
  5. 单元测试中 mock 事务:测试业务逻辑时无需真实事务
typescript
// 测试中 mock DataSource.transaction
jest.spyOn(dataSource, 'transaction').mockImplementation(async (fn) => {
  return fn(mockManager);
});

可运行 Demo: practice/03-crud-app/typeorm-version — TypeORM DataSource.transaction 事务示例


常见错误

错误原因解决
事务中注入的 Repository 不在事务内DI 注入的 repo 不属于当前事务 session在事务回调中用 manager.getRepository(Entity) 获取事务内的 repo
事务未提交 / 死锁异常未正确处理导致事务挂起try/catch/finally 或让 dataSource.transaction() 自动管理提交/回滚
SERIALIZATION_FAILURE 错误并发写入时事务冲突(Serializable 隔离级别)捕获后重试,或降低隔离级别到 READ COMMITTED
Prisma 嵌套事务报错Prisma 不支持原生嵌套事务$transaction 数组形式传递多个操作,或用 Savepoint

NestJS 深度学习体系