外观
事务管理
基于 @PrismaTransactional 的声明式事务管理,自动处理 Prisma 交互式事务的生命周期。
特性
- 声明式:使用
@PrismaTransactional()装饰器,代码更简洁 - 自动管理:自动开启、提交、回滚事务
- 跨类传播:基于
AsyncLocalStorage,事务自动跨类传播,无需手动传递 tx - 嵌套复用:内层
@PrismaTransactional自动复用外层事务 - 安全防护:事务内通过
this.prisma调用$transaction()抛出明确错误,防止事务隔离失效 - 零额外 Proxy:ALS 直接存储 Prisma 原始 tx,不做二次包装,兼容 Prisma 7 runtime proxy
- 并发安全:基于
AsyncLocalStorage,多请求并发互不干扰
基础用法
typescript
import { Injectable } from '@nestjs/common'
import { PrismaTransactional } from '@maxtan/nest-core'
import { PrismaService } from '../prisma/prisma.service'
@Injectable()
export class UserService {
constructor(private readonly prisma: PrismaService) {}
@PrismaTransactional()
async createUser(dto: CreateUser) {
// PrismaBaseService 在事务内自动返回 tx,无需手动 usePrismaClient
const user = await this.prisma.user.create({ data: dto })
const profile = await this.prisma.profile.create({
data: { userId: user.id }
})
return { user, profile }
}
// 异常自动回滚
@PrismaTransactional()
async transfer(fromId: string, toId: string, amount: number) {
const from = await this.prisma.account.findUnique({ where: { id: fromId } })
if (from.balance < amount) {
throw new BadRequestException('余额不足') // 自动回滚
}
await this.prisma.account.update({
where: { id: fromId },
data: { balance: { decrement: amount } }
})
await this.prisma.account.update({
where: { id: toId },
data: { balance: { increment: amount } }
})
}
}PrismaBaseService 自动事务感知
PrismaBaseService 内置 Proxy,this.prisma.xxx 自动检测事务上下文。 在 @PrismaTransactional 内返回事务客户端(tx),外部返回原始 PrismaClient。 业务代码可直接使用 this.prisma.xxx 而无需显式调用 usePrismaClient()。
## 配置选项
```typescript
@PrismaTransactional({
timeout: 30000, // 事务超时(毫秒),默认 30000
maxWait: 5000, // 获取连接最大等待时间,默认 5000
isolationLevel: 'ReadCommitted', // 隔离级别
prismaField: 'prisma' // PrismaClient 属性名,默认 'prisma'
})| 选项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
timeout | number | 30000 | 事务超时(毫秒) |
maxWait | number | 5000 | Prisma 获取连接的最大等待时间 |
isolationLevel | string | - | 事务隔离级别 |
prismaField | string | 'prisma' | this[prismaField] 获取 PrismaClient |
支持的隔离级别:ReadUncommitted | ReadCommitted | RepeatableRead | Serializable | Snapshot
prismaField 自动解析
装饰器从 this[prismaField] 获取对象后,按以下优先级解析真正的 PrismaClient:
| 优先级 | this.prisma 的类型 | 解析路径 | 说明 |
|---|---|---|---|
| 1 | PrismaBaseService 子类 | this.prisma.rawClient | 优先读取 .rawClient(跳过 service proxy,避免 Proxy 叠加递归) |
| 2 | PrismaClient 实例 | 直接使用 | this.prisma.$transaction 存在 |
| 3 | 其他 holder | this.prisma.client | 兼容降级:读取 .client 属性 |
解析顺序很重要
PrismaBaseService 的 Proxy 会将 $transaction 透传到内部 client。 如果先检查 $transaction 存在,会错误地把 service proxy 本身当作 PrismaClient, 导致 Prisma 内部所有属性访问都经过 service proxy,与 Prisma 7 runtime proxy 叠加后形成递归。 因此必须先检测 rawClient,确保拿到的是原始 PrismaClient。
typescript
// ✅ PrismaBaseService 子类注入,client 自动事务感知
@Injectable()
export class OrderService {
constructor(private readonly prisma: PrismaService) {} // PrismaBaseService 子类
@PrismaTransactional()
async createOrder(dto: CreateOrder) {
// 直接用 this.prisma,事务内自动返回 tx
return this.prisma.order.create({ data: dto })
}
}事务内获取客户端
usePrismaClient
显式获取事务感知客户端。如果使用 PrismaBaseService,其内置 Proxy 已包含此逻辑,通常无需显式调用。 适用于直接注入 PrismaClient(非 PrismaBaseService)的场景:
typescript
import { usePrismaClient } from '@maxtan/nest-core'
@PrismaTransactional()
async createUser(dto: CreateUser) {
// 事务内 → 返回 tx(所有操作共享事务)
// 事务外 → 返回原始 PrismaClient
const client = usePrismaClient(this.prisma.rawClient)
return client.user.create({ data: dto })
}跨类事务传播
基于 AsyncLocalStorage,调用其他 Service 的 @PrismaTransactional 方法时,自动复用同一事务:
typescript
// UserService
@PrismaTransactional()
async createUserWithProfile(dto: CreateUser) {
const user = await this.prisma.user.create({ data: dto })
// profileService 的方法自动复用当前事务,无需传递 tx
await this.profileService.createProfile(user.id)
// 如果 createProfile 抛出异常,user 创建也会回滚
return user
}
// ProfileService(另一个类)
@PrismaTransactional()
async createProfile(userId: string) {
// 自动检测到外层事务 → 复用(不创建新事务)
return this.prisma.profile.create({ data: { userId } })
}嵌套检测
① AsyncLocalStorage 检测 — 同一异步上下文中已有事务 → 复用
② 无事务 → 创建新的交互式事务 + ALS 绑定批量查询的正确写法
事务内禁止调用 $transaction
在 @PrismaTransactional 方法内调用 $transaction() 会抛出明确错误,防止以下问题:
- 原子性丢失:
$transaction([...])创建独立事务,外层回滚时已提交的数据不会回滚 - 死锁风险:外层事务持有锁,内层
$transaction尝试获取同一锁 → 死锁 - 连接池耗尽:嵌套
$transaction各占一个连接,可能耗尽连接池
typescript
@PrismaTransactional()
async listWithCount(where: any) {
// ✔ this.prisma 在事务内自动返回 tx
// ❌ 事务内禁止调用 $transaction() — 会抛出错误
// const [records, total] = await this.prisma.$transaction([...])
// ✅ 改用 Promise.all — 已在同一事务中
const [records, total] = await Promise.all([
this.prisma.user.findMany({ where }),
this.prisma.user.count({ where })
])
return { records, total }
}工作原理
请求 → @PrismaTransactional()
│
├─ ① ALS 检测:已有事务 → 复用(跳过创建)
└─ ② 无事务:
├─ 解析 PrismaClient(优先 rawClient,跳过 service proxy)
├─ prisma.$transaction(async (tx) => { ... })
└─ transactionStore.run({ tx }, () => originalMethod(...))
│
├─ PrismaBaseService.client / usePrismaClient() → 返回 tx
├─ PrismaBaseService proxy → 事务内拦截 $transaction/$connect/$disconnect
└─ 跨类 @PrismaTransactional 方法 → ALS 检测到 → 自动复用设计说明:为什么不对 tx 做二次 Proxy 包装
早期版本通过 createSafeTxProxy 在 Prisma 原始 tx 外再包一层 Proxy,用于拦截 $transaction 等危险调用。 但 Prisma 7 的事务客户端本身就是 runtime proxy,与 PrismaBaseService 的 Proxy 叠加后, 形成三层 Proxy(service proxy → safe tx proxy → Prisma runtime proxy), Reflect.get 的 receiver 在层间传递时触发递归属性解析,最终导致 Maximum call stack size exceeded。
当前方案:
- ALS 直接存原始 tx——零额外 Proxy 层,Prisma 内部反射不受干扰
- 安全拦截移到 PrismaBaseService proxy——只在开发者经由
this.prisma.xxx访问时生效, 不侵入 Prisma 对象本身 isInTransaction()函数——PrismaBaseService proxy 通过此函数检测事务状态,按需拦截
注意事项
- 在 Service 层使用,不要在 Controller 中使用
- 调用时无需额外参数:
await this.userService.createUser(dto) - 事务内直接用
this.prisma,PrismaBaseService自动返回事务客户端 - 事务内不要调用
$transaction(),批量查询改用Promise.all() - 避免大事务:事务越小越好,不要在事务中做 HTTP 调用或重计算
- 不要在 Prisma tx 对象上再包 Proxy:Prisma 7 的 tx 本身是 runtime proxy,叠加会导致栈溢出
与 PrismaRepository 配合
PrismaRepository 内部通过 usePrismaClient 动态获取模型委托,因此 Repository 方法天然参与 @PrismaTransactional 事务:
typescript
@Injectable()
export class UserRepository extends PrismaRepository<User> {
constructor(private prisma: PrismaService) {
super(prisma, 'user')
}
}
@Injectable()
export class UserService {
constructor(
private readonly prisma: PrismaService,
private readonly userRepo: UserRepository,
private readonly profileService: ProfileService
) {}
@PrismaTransactional()
async createUserWithProfile(dto: CreateUser) {
// ✅ Repository 方法自动参与事务
const user = await this.userRepo.create(dto)
// ✅ 跨类调用自动复用事务
await this.profileService.createProfile(user.id)
return user
}
}推荐
简单 CRUD 场景建议 Service 直接操作 this.prisma,无需 DAO 层。 DAO 适合需要复用查询逻辑、统一排序/选择策略的场景。
最佳实践
- 只在 Service 层使用:事务管理是业务逻辑,不应在 Controller 中开启事务
- 直接用
this.prisma:PrismaBaseService自动事务感知,跨类调用自动传播 - 转账场景为典型:多表写入、余额扣减、订单状态变更等需保证原子性的场景使用事务
- 合理超时:默认 30s,根据业务复杂度调整,避免长时间锁定资源
- 批量查询用 Promise.all:事务内的并行查询不需要
$transaction([]),Promise.all即可
多数据源事务
在多数据源场景下,每个 PrismaBaseService 子类持有独立的 PrismaClient。使用 @PrismaTransactional 时需通过 prismaField 明确指定目标数据源:
typescript
@Injectable()
export class OrderService {
constructor(
private readonly primaryDb: PrimaryPrismaService,
private readonly secondaryDb: SecondaryPrismaService
) {}
/** 主库事务:创建订单 + 扣减库存 */
@PrismaTransactional({ prismaField: 'primaryDb' })
async createOrder(dto: CreateOrder) {
const order = await this.primaryDb.order.create({ data: dto })
await this.primaryDb.inventory.update({
where: { id: dto.productId },
data: { stock: { decrement: dto.quantity } }
})
return order
}
/** 从库事务:批量写入分析日志 */
@PrismaTransactional({ prismaField: 'secondaryDb' })
async batchWriteLog(logs: CreateLog[]) {
for (const log of logs) {
await this.secondaryDb.analyticsLog.create({ data: log })
}
}
}多数据源事务规则
| 规则 | 说明 |
|---|---|
prismaField 必须指定 | 默认值是 'prisma',多数据源时属性名不同,必须显式配置 |
| 单事务单数据源 | 一个 @PrismaTransactional 只能操作一个数据库 |
| 跨类传播仅限同一数据源 | 被调用 Service 的 @PrismaTransactional 复用外层事务时,必须使用同一 PrismaClient |
| 跨库不支持分布式事务 | 需要跨库原子性时,使用 Saga 补偿模式或最终一致性 |
跨库操作的正确写法
typescript
@Injectable()
export class TransferService {
constructor(
private readonly primaryDb: PrimaryPrismaService,
private readonly secondaryDb: SecondaryPrismaService
) {}
/**
* 跨库操作:主库扣减 + 从库记录
* 不能用单个事务覆盖两个库,需分别处理
*/
async transferWithLog(dto: TransferDto) {
// ① 主库事务:保证扣减和增加的原子性
const result = await this.doTransfer(dto)
// ② 从库写入日志(非事务内,允许降级)
try {
await this.secondaryDb.transferLog.create({
data: { ...dto, status: 'SUCCESS', transferredAt: new Date() }
})
} catch (e) {
// 日志写入失败不影响主业务,记录错误后续补偿
console.error('日志写入失败,等待补偿', e)
}
return result
}
@PrismaTransactional({ prismaField: 'primaryDb' })
private async doTransfer(dto: TransferDto) {
await this.primaryDb.account.update({
where: { id: dto.fromId },
data: { balance: { decrement: dto.amount } }
})
await this.primaryDb.account.update({
where: { id: dto.toId },
data: { balance: { increment: dto.amount } }
})
return { success: true }
}
}高级场景
事务内的条件分支
typescript
@PrismaTransactional()
async processOrder(dto: ProcessOrder) {
const order = await this.prisma.order.findUnique({
where: { id: dto.orderId }
})
if (!order) throw new AppException(MSG.NOT_FOUND_DATA)
if (dto.action === 'approve') {
// 审批通过:更新状态 + 扣减库存
await this.prisma.order.update({
where: { id: order.id },
data: { status: 'APPROVED' }
})
await this.prisma.inventory.update({
where: { id: order.productId },
data: { stock: { decrement: order.quantity } }
})
} else {
// 审批拒绝:仅更新状态
await this.prisma.order.update({
where: { id: order.id },
data: { status: 'REJECTED', reason: dto.reason }
})
}
// 无论哪个分支,异常都会回滚所有操作
}事务内的循环批量操作
typescript
@PrismaTransactional({ timeout: 60000 }) // 批量操作适当增大超时
async batchImport(items: CreateProduct[]) {
const results = []
for (const item of items) {
// 逐条创建,全部在同一事务中
const product = await this.prisma.product.create({ data: item })
results.push(product)
}
// 任一条失败 → 全部回滚
return results
}事务 + Repository 配合
typescript
@Injectable()
export class UserService {
constructor(
private readonly prisma: PrismaService,
private readonly userRepo: UserRepository,
private readonly profileRepo: ProfileRepository
) {}
@PrismaTransactional()
async createUserWithProfile(dto: CreateUser) {
// ✅ Repository 方法自动参与事务
const user = await this.userRepo.create(dto)
await this.profileRepo.create({ userId: user.id, bio: '' })
// ✅ 直接操作 prisma 也在同一事务中
await this.prisma.auditLog.create({
data: { action: 'CREATE_USER', targetId: user.id }
})
return user
}
}事务隔离级别选择指南
| 隔离级别 | 脏读 | 不可重复读 | 幻读 | 适用场景 |
|---|---|---|---|---|
ReadUncommitted | 可能 | 可能 | 可能 | 极少使用,仅性能极端场景 |
ReadCommitted | ✘ | 可能 | 可能 | 默认推荐:大多数业务场景 |
RepeatableRead | ✘ | ✘ | 可能 | 金融余额、库存扣减等需一致性快照 |
Serializable | ✘ | ✘ | ✘ | 最高一致性,性能最低,仅关键场景 |
typescript
// 金融转账:使用 RepeatableRead 防止并发余额不一致
@PrismaTransactional({ isolationLevel: 'RepeatableRead' })
async transfer(fromId: string, toId: string, amount: number) {
const from = await this.prisma.account.findUnique({ where: { id: fromId } })
if (from.balance < amount) throw new AppException('余额不足')
// ...
}
// 关键账务结算:使用 Serializable
@PrismaTransactional({ isolationLevel: 'Serializable', timeout: 60000 })
async settle(periodId: string) {
// 严格串行化,防止并发结算
}