跳转到内容

事务管理

基于 @PrismaTransactional 的声明式事务管理,自动处理 Prisma 交互式事务的生命周期。

特性

  • 声明式:使用 @PrismaTransactional() 装饰器,代码更简洁
  • 自动管理:自动开启、提交、回滚事务
  • 跨类传播:基于 AsyncLocalStorage,事务自动跨类传播,无需手动传递 tx
  • 嵌套复用:内层 @PrismaTransactional 自动复用外层事务
  • 安全防护:事务内通过 this.prisma 调用 $transaction() 抛出明确错误,防止事务隔离失效
  • 零额外 Proxy:ALS 直接存储 Prisma 原始 tx,不做二次包装,兼容 Prisma 7 runtime proxy
  • 并发安全:基于 AsyncLocalStorage,多请求并发互不干扰

基础用法

typescript
import { Injectable } from '@nestjs/common'
import { PrismaTransactional } from '@maxtan/nest-core'
import { PrismaService } from '../prisma/prisma.service'

@Injectable()
export class UserService {
  constructor(private readonly prisma: PrismaService) {}

  @PrismaTransactional()
  async createUser(dto: CreateUser) {
    // PrismaBaseService 在事务内自动返回 tx,无需手动 usePrismaClient
    const user = await this.prisma.user.create({ data: dto })
    const profile = await this.prisma.profile.create({
      data: { userId: user.id }
    })
    return { user, profile }
  }

  // 异常自动回滚
  @PrismaTransactional()
  async transfer(fromId: string, toId: string, amount: number) {
    const from = await this.prisma.account.findUnique({ where: { id: fromId } })

    if (from.balance < amount) {
      throw new BadRequestException('余额不足') // 自动回滚
    }

    await this.prisma.account.update({
      where: { id: fromId },
      data: { balance: { decrement: amount } }
    })
    await this.prisma.account.update({
      where: { id: toId },
      data: { balance: { increment: amount } }
    })
  }
}

PrismaBaseService 自动事务感知

PrismaBaseService 内置 Proxy,this.prisma.xxx 自动检测事务上下文。 在 @PrismaTransactional 内返回事务客户端(tx),外部返回原始 PrismaClient。 业务代码可直接使用 this.prisma.xxx 而无需显式调用 usePrismaClient()


## 配置选项

```typescript
@PrismaTransactional({
  timeout: 30000,                    // 事务超时(毫秒),默认 30000
  maxWait: 5000,                     // 获取连接最大等待时间,默认 5000
  isolationLevel: 'ReadCommitted',   // 隔离级别
  prismaField: 'prisma'             // PrismaClient 属性名,默认 'prisma'
})
选项类型默认值说明
timeoutnumber30000事务超时(毫秒)
maxWaitnumber5000Prisma 获取连接的最大等待时间
isolationLevelstring-事务隔离级别
prismaFieldstring'prisma'this[prismaField] 获取 PrismaClient

支持的隔离级别:ReadUncommitted | ReadCommitted | RepeatableRead | Serializable | Snapshot

prismaField 自动解析

装饰器从 this[prismaField] 获取对象后,按以下优先级解析真正的 PrismaClient:

优先级this.prisma 的类型解析路径说明
1PrismaBaseService 子类this.prisma.rawClient优先读取 .rawClient(跳过 service proxy,避免 Proxy 叠加递归)
2PrismaClient 实例直接使用this.prisma.$transaction 存在
3其他 holderthis.prisma.client兼容降级:读取 .client 属性

解析顺序很重要

PrismaBaseService 的 Proxy 会将 $transaction 透传到内部 client。 如果先检查 $transaction 存在,会错误地把 service proxy 本身当作 PrismaClient, 导致 Prisma 内部所有属性访问都经过 service proxy,与 Prisma 7 runtime proxy 叠加后形成递归。 因此必须先检测 rawClient,确保拿到的是原始 PrismaClient。

typescript
// ✅ PrismaBaseService 子类注入,client 自动事务感知
@Injectable()
export class OrderService {
  constructor(private readonly prisma: PrismaService) {} // PrismaBaseService 子类

  @PrismaTransactional()
  async createOrder(dto: CreateOrder) {
    // 直接用 this.prisma,事务内自动返回 tx
    return this.prisma.order.create({ data: dto })
  }
}

事务内获取客户端

usePrismaClient

显式获取事务感知客户端。如果使用 PrismaBaseService,其内置 Proxy 已包含此逻辑,通常无需显式调用。 适用于直接注入 PrismaClient(非 PrismaBaseService)的场景:

typescript
import { usePrismaClient } from '@maxtan/nest-core'

@PrismaTransactional()
async createUser(dto: CreateUser) {
  // 事务内 → 返回 tx(所有操作共享事务)
  // 事务外 → 返回原始 PrismaClient
  const client = usePrismaClient(this.prisma.rawClient)
  return client.user.create({ data: dto })
}

跨类事务传播

基于 AsyncLocalStorage,调用其他 Service 的 @PrismaTransactional 方法时,自动复用同一事务:

typescript
// UserService
@PrismaTransactional()
async createUserWithProfile(dto: CreateUser) {
  const user = await this.prisma.user.create({ data: dto })

  // profileService 的方法自动复用当前事务,无需传递 tx
  await this.profileService.createProfile(user.id)
  // 如果 createProfile 抛出异常,user 创建也会回滚
  return user
}

// ProfileService(另一个类)
@PrismaTransactional()
async createProfile(userId: string) {
  // 自动检测到外层事务 → 复用(不创建新事务)
  return this.prisma.profile.create({ data: { userId } })
}

嵌套检测

① AsyncLocalStorage 检测 — 同一异步上下文中已有事务 → 复用
② 无事务 → 创建新的交互式事务 + ALS 绑定

批量查询的正确写法

事务内禁止调用 $transaction

@PrismaTransactional 方法内调用 $transaction() 会抛出明确错误,防止以下问题:

  • 原子性丢失$transaction([...]) 创建独立事务,外层回滚时已提交的数据不会回滚
  • 死锁风险:外层事务持有锁,内层 $transaction 尝试获取同一锁 → 死锁
  • 连接池耗尽:嵌套 $transaction 各占一个连接,可能耗尽连接池
typescript
@PrismaTransactional()
async listWithCount(where: any) {
  // ✔ this.prisma 在事务内自动返回 tx

  // ❌ 事务内禁止调用 $transaction() — 会抛出错误
  // const [records, total] = await this.prisma.$transaction([...])

  // ✅ 改用 Promise.all — 已在同一事务中
  const [records, total] = await Promise.all([
    this.prisma.user.findMany({ where }),
    this.prisma.user.count({ where })
  ])
  return { records, total }
}

工作原理

请求 → @PrismaTransactional()

          ├─ ① ALS 检测:已有事务 → 复用(跳过创建)
          └─ ② 无事务:
               ├─ 解析 PrismaClient(优先 rawClient,跳过 service proxy)
               ├─ prisma.$transaction(async (tx) => { ... })
               └─ transactionStore.run({ tx }, () => originalMethod(...))

                    ├─ PrismaBaseService.client / usePrismaClient() → 返回 tx
                    ├─ PrismaBaseService proxy → 事务内拦截 $transaction/$connect/$disconnect
                    └─ 跨类 @PrismaTransactional 方法 → ALS 检测到 → 自动复用
设计说明:为什么不对 tx 做二次 Proxy 包装

早期版本通过 createSafeTxProxy 在 Prisma 原始 tx 外再包一层 Proxy,用于拦截 $transaction 等危险调用。 但 Prisma 7 的事务客户端本身就是 runtime proxy,与 PrismaBaseService 的 Proxy 叠加后, 形成三层 Proxy(service proxy → safe tx proxy → Prisma runtime proxy), Reflect.get 的 receiver 在层间传递时触发递归属性解析,最终导致 Maximum call stack size exceeded

当前方案:

  • ALS 直接存原始 tx——零额外 Proxy 层,Prisma 内部反射不受干扰
  • 安全拦截移到 PrismaBaseService proxy——只在开发者经由 this.prisma.xxx 访问时生效, 不侵入 Prisma 对象本身
  • isInTransaction() 函数——PrismaBaseService proxy 通过此函数检测事务状态,按需拦截

注意事项

  1. 在 Service 层使用,不要在 Controller 中使用
  2. 调用时无需额外参数await this.userService.createUser(dto)
  3. 事务内直接用 this.prismaPrismaBaseService 自动返回事务客户端
  4. 事务内不要调用 $transaction(),批量查询改用 Promise.all()
  5. 避免大事务:事务越小越好,不要在事务中做 HTTP 调用或重计算
  6. 不要在 Prisma tx 对象上再包 Proxy:Prisma 7 的 tx 本身是 runtime proxy,叠加会导致栈溢出

与 PrismaRepository 配合

PrismaRepository 内部通过 usePrismaClient 动态获取模型委托,因此 Repository 方法天然参与 @PrismaTransactional 事务:

typescript
@Injectable()
export class UserRepository extends PrismaRepository<User> {
  constructor(private prisma: PrismaService) {
    super(prisma, 'user')
  }
}

@Injectable()
export class UserService {
  constructor(
    private readonly prisma: PrismaService,
    private readonly userRepo: UserRepository,
    private readonly profileService: ProfileService
  ) {}

  @PrismaTransactional()
  async createUserWithProfile(dto: CreateUser) {
    // ✅ Repository 方法自动参与事务
    const user = await this.userRepo.create(dto)
    // ✅ 跨类调用自动复用事务
    await this.profileService.createProfile(user.id)
    return user
  }
}

推荐

简单 CRUD 场景建议 Service 直接操作 this.prisma,无需 DAO 层。 DAO 适合需要复用查询逻辑、统一排序/选择策略的场景。

最佳实践

  1. 只在 Service 层使用:事务管理是业务逻辑,不应在 Controller 中开启事务
  2. 直接用 this.prismaPrismaBaseService 自动事务感知,跨类调用自动传播
  3. 转账场景为典型:多表写入、余额扣减、订单状态变更等需保证原子性的场景使用事务
  4. 合理超时:默认 30s,根据业务复杂度调整,避免长时间锁定资源
  5. 批量查询用 Promise.all:事务内的并行查询不需要 $transaction([])Promise.all 即可

多数据源事务

在多数据源场景下,每个 PrismaBaseService 子类持有独立的 PrismaClient。使用 @PrismaTransactional 时需通过 prismaField 明确指定目标数据源:

typescript
@Injectable()
export class OrderService {
  constructor(
    private readonly primaryDb: PrimaryPrismaService,
    private readonly secondaryDb: SecondaryPrismaService
  ) {}

  /** 主库事务:创建订单 + 扣减库存 */
  @PrismaTransactional({ prismaField: 'primaryDb' })
  async createOrder(dto: CreateOrder) {
    const order = await this.primaryDb.order.create({ data: dto })
    await this.primaryDb.inventory.update({
      where: { id: dto.productId },
      data: { stock: { decrement: dto.quantity } }
    })
    return order
  }

  /** 从库事务:批量写入分析日志 */
  @PrismaTransactional({ prismaField: 'secondaryDb' })
  async batchWriteLog(logs: CreateLog[]) {
    for (const log of logs) {
      await this.secondaryDb.analyticsLog.create({ data: log })
    }
  }
}

多数据源事务规则

规则说明
prismaField 必须指定默认值是 'prisma',多数据源时属性名不同,必须显式配置
单事务单数据源一个 @PrismaTransactional 只能操作一个数据库
跨类传播仅限同一数据源被调用 Service 的 @PrismaTransactional 复用外层事务时,必须使用同一 PrismaClient
跨库不支持分布式事务需要跨库原子性时,使用 Saga 补偿模式或最终一致性

跨库操作的正确写法

typescript
@Injectable()
export class TransferService {
  constructor(
    private readonly primaryDb: PrimaryPrismaService,
    private readonly secondaryDb: SecondaryPrismaService
  ) {}

  /**
   * 跨库操作:主库扣减 + 从库记录
   * 不能用单个事务覆盖两个库,需分别处理
   */
  async transferWithLog(dto: TransferDto) {
    // ① 主库事务:保证扣减和增加的原子性
    const result = await this.doTransfer(dto)

    // ② 从库写入日志(非事务内,允许降级)
    try {
      await this.secondaryDb.transferLog.create({
        data: { ...dto, status: 'SUCCESS', transferredAt: new Date() }
      })
    } catch (e) {
      // 日志写入失败不影响主业务,记录错误后续补偿
      console.error('日志写入失败,等待补偿', e)
    }

    return result
  }

  @PrismaTransactional({ prismaField: 'primaryDb' })
  private async doTransfer(dto: TransferDto) {
    await this.primaryDb.account.update({
      where: { id: dto.fromId },
      data: { balance: { decrement: dto.amount } }
    })
    await this.primaryDb.account.update({
      where: { id: dto.toId },
      data: { balance: { increment: dto.amount } }
    })
    return { success: true }
  }
}

高级场景

事务内的条件分支

typescript
@PrismaTransactional()
async processOrder(dto: ProcessOrder) {
  const order = await this.prisma.order.findUnique({
    where: { id: dto.orderId }
  })
  if (!order) throw new AppException(MSG.NOT_FOUND_DATA)

  if (dto.action === 'approve') {
    // 审批通过:更新状态 + 扣减库存
    await this.prisma.order.update({
      where: { id: order.id },
      data: { status: 'APPROVED' }
    })
    await this.prisma.inventory.update({
      where: { id: order.productId },
      data: { stock: { decrement: order.quantity } }
    })
  } else {
    // 审批拒绝:仅更新状态
    await this.prisma.order.update({
      where: { id: order.id },
      data: { status: 'REJECTED', reason: dto.reason }
    })
  }
  // 无论哪个分支,异常都会回滚所有操作
}

事务内的循环批量操作

typescript
@PrismaTransactional({ timeout: 60000 }) // 批量操作适当增大超时
async batchImport(items: CreateProduct[]) {
  const results = []
  for (const item of items) {
    // 逐条创建,全部在同一事务中
    const product = await this.prisma.product.create({ data: item })
    results.push(product)
  }
  // 任一条失败 → 全部回滚
  return results
}

事务 + Repository 配合

typescript
@Injectable()
export class UserService {
  constructor(
    private readonly prisma: PrismaService,
    private readonly userRepo: UserRepository,
    private readonly profileRepo: ProfileRepository
  ) {}

  @PrismaTransactional()
  async createUserWithProfile(dto: CreateUser) {
    // ✅ Repository 方法自动参与事务
    const user = await this.userRepo.create(dto)
    await this.profileRepo.create({ userId: user.id, bio: '' })

    // ✅ 直接操作 prisma 也在同一事务中
    await this.prisma.auditLog.create({
      data: { action: 'CREATE_USER', targetId: user.id }
    })
    return user
  }
}

事务隔离级别选择指南

隔离级别脏读不可重复读幻读适用场景
ReadUncommitted可能可能可能极少使用,仅性能极端场景
ReadCommitted可能可能默认推荐:大多数业务场景
RepeatableRead可能金融余额、库存扣减等需一致性快照
Serializable最高一致性,性能最低,仅关键场景
typescript
// 金融转账:使用 RepeatableRead 防止并发余额不一致
@PrismaTransactional({ isolationLevel: 'RepeatableRead' })
async transfer(fromId: string, toId: string, amount: number) {
  const from = await this.prisma.account.findUnique({ where: { id: fromId } })
  if (from.balance < amount) throw new AppException('余额不足')
  // ...
}

// 关键账务结算:使用 Serializable
@PrismaTransactional({ isolationLevel: 'Serializable', timeout: 60000 })
async settle(periodId: string) {
  // 严格串行化,防止并发结算
}

相关文档