分布式事务
什么是分布式事务
在微服务架构中,一个业务操作往往需要跨越多个服务和数据库。分布式事务就是为了保证这些跨服务、跨数据库的操作要么全部成功,要么全部失败,从而保证数据的一致性。
为什么需要分布式事务
在单体应用中,我们可以使用数据库的本地事务(ACID)来保证数据一致性。但在微服务架构下:
- 每个服务有独立的数据库
- 服务之间通过网络通信
- 无法使用传统的数据库事务来保证一致性
典型场景:
- 电商下单:扣减库存 + 创建订单 + 扣减余额
- 转账业务:A账户扣款 + B账户加款
- 积分系统:消费扣款 + 积分增加
分布式事务理论基础
CAP 理论
- C (Consistency): 一致性,所有节点同一时间看到相同数据
- A (Availability): 可用性,系统持续提供服务
- P (Partition Tolerance): 分区容错性,网络分区时系统继续运行
结论: 在分布式系统中,P 是必须保证的,只能在 C 和 A 之间权衡。
BASE 理论
- BA (Basically Available): 基本可用
- S (Soft State): 软状态,允许中间状态
- E (Eventually Consistent): 最终一致性
BASE 理论是对 CAP 中一致性和可用性权衡的结果,通过牺牲强一致性来获得可用性。
分布式事务解决方案
1. 两阶段提交(2PC)
流程:
准备阶段(Prepare):
- 协调者向所有参与者发送准备请求
- 参与者执行事务操作但不提交,返回 Yes/No
提交阶段(Commit):
- 如果所有参与者返回 Yes,协调者发送提交请求
- 如果有参与者返回 No,协调者发送回滚请求
优点:
- 强一致性保证
- 实现相对简单
缺点:
- 同步阻塞,性能较差
- 单点故障(协调者)
- 数据不一致风险(网络分区时)
2. TCC(Try-Confirm-Cancel)
TCC 是一种补偿型事务,将业务操作分为三个阶段:
Try 阶段:
- 尝试执行业务
- 完成所有业务检查
- 预留必需的业务资源
Confirm 阶段:
- 确认执行业务
- 使用 Try 阶段预留的资源
- 操作满足幂等性
Cancel 阶段:
- 取消执行业务
- 释放 Try 阶段预留的资源
- 操作满足幂等性
示例:转账业务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Try 阶段
func TransferTry(from, to string, amount int) error {
// 冻结转出账户金额
if err := FreezeBalance(from, amount); err != nil {
return err
}
// 预留转入账户额度
if err := ReserveBalance(to, amount); err != nil {
UnfreezeBalance(from, amount)
return err
}
return nil
}
// Confirm 阶段
func TransferConfirm(from, to string, amount int) error {
// 扣减冻结金额
DeductFrozenBalance(from, amount)
// 增加账户余额
AddBalance(to, amount)
return nil
}
// Cancel 阶段
func TransferCancel(from, to string, amount int) error {
// 解冻金额
UnfreezeBalance(from, amount)
// 取消预留
CancelReserve(to, amount)
return nil
}
优点:
- 性能较好,无长时间锁定
- 不依赖数据库事务
缺点:
- 业务侵入性强,需要实现三个接口
- 开发成本高
3. SAGA 模式
SAGA 将长事务拆分为多个本地短事务,每个短事务都有对应的补偿操作。
两种实现方式:
事件编排(Choreography):
- 去中心化,每个服务监听事件并触发下一步
- 服务间通过事件总线通信
命令协调(Orchestration):
- 中心化,由协调器控制事务流程
- 协调器负责调用各个服务
示例:订单流程
1
2
3
4
5
// 正向操作
CreateOrder -> DeductInventory -> DeductBalance -> SendNotification
// 补偿操作(失败时反向执行)
CancelOrder <- RestoreInventory <- RefundBalance <- CancelNotification
优点:
- 长事务支持
- 不需要锁定资源
- 性能好
缺点:
- 需要实现补偿逻辑
- 无法保证隔离性
- 可能出现中间状态
4. 本地消息表
通过本地消息表和消息队列实现最终一致性。
流程:
- 在本地事务中执行业务操作并插入消息表
- 定时任务扫描消息表,发送消息到 MQ
- 消费者消费消息,执行业务操作
- 确认消费后,更新消息状态
优点:
- 实现简单
- 可靠性高
缺点:
- 业务侵入
- 需要额外的消息表
5. 可靠消息最终一致性
基于 MQ 的事务消息实现。
流程:
- 发送半消息(Prepare)到 MQ
- 执行本地事务
- 根据本地事务结果提交或回滚消息
- 消费者消费消息,执行业务
优点:
- 解耦性好
- 性能高
缺点:
- 依赖 MQ 的事务消息功能
- 最终一致性,有延迟
DTM 框架详解
DTM(Distributed Transaction Manager)是一个开源的分布式事务管理框架,支持多种事务模式。
DTM 特点
- 多语言支持: Go、Python、PHP、Node.js 等
- 多种模式: SAGA、TCC、XA、二阶段消息
- 易用性: 提供简洁的 API
- 高性能: 支持高并发场景
- 可观测性: 提供完善的监控和日志
DTM 架构
1
2
3
4
5
6
7
8
9
10
11
12
┌─────────────┐
│ 业务服务 │
└──────┬──────┘
│ HTTP/gRPC
┌──────▼──────┐
│ DTM Server │ ◄─── 事务协调
└──────┬──────┘
│
┌──────▼──────┐
│ MySQL/ │
│ Redis │ ◄─── 事务状态存储
└─────────────┘
DTM SAGA 示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main
import (
"github.com/dtm-labs/client/dtmcli"
"github.com/dtm-labs/client/workflow"
)
// 定义服务 URL
const (
dtmServer = "http://localhost:36789/api/dtmsvr"
orderSvc = "http://localhost:8081"
stockSvc = "http://localhost:8082"
paySvc = "http://localhost:8083"
)
func CreateOrderSaga() error {
// 创建 SAGA 事务
saga := dtmcli.NewSaga(dtmServer, dtmcli.MustGenGid(dtmServer)).
// 添加子事务:创建订单
Add(orderSvc+"/create", orderSvc+"/create-compensate", &OrderReq{}).
// 添加子事务:扣减库存
Add(stockSvc+"/deduct", stockSvc+"/deduct-compensate", &StockReq{}).
// 添加子事务:扣减余额
Add(paySvc+"/deduct", paySvc+"/deduct-compensate", &PayReq{})
// 提交 SAGA 事务
err := saga.Submit()
return err
}
// 订单服务 - 正向操作
func CreateOrder(c *gin.Context) {
// 创建订单逻辑
// ...
c.JSON(200, gin.H{"result": "success"})
}
// 订单服务 - 补偿操作
func CreateOrderCompensate(c *gin.Context) {
// 取消订单逻辑
// ...
c.JSON(200, gin.H{"result": "success"})
}
DTM TCC 示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func CreateOrderTCC() error {
// 创建 TCC 事务
gid := dtmcli.MustGenGid(dtmServer)
err := dtmcli.TccGlobalTransaction(dtmServer, gid, func(tcc *dtmcli.Tcc) error {
// 注册 Try-Confirm-Cancel 分支
// 库存服务
err := tcc.CallBranch(&StockReq{},
stockSvc+"/try",
stockSvc+"/confirm",
stockSvc+"/cancel")
if err != nil {
return err
}
// 支付服务
err = tcc.CallBranch(&PayReq{},
paySvc+"/try",
paySvc+"/confirm",
paySvc+"/cancel")
if err != nil {
return err
}
return nil
})
return err
}
Seata 框架详解
Seata(Simple Extensible Autonomous Transaction Architecture)是阿里开源的分布式事务解决方案。
Seata 架构
1
2
3
4
5
6
7
8
9
10
11
┌──────────────────────────────────────┐
│ Transaction Coordinator │
│ (TC Server) │
└────────────┬─────────────────────────┘
│
┌────────┼────────┐
│ │ │
┌───▼───┐ ┌─▼────┐ ┌─▼────┐
│ TM │ │ RM │ │ RM │
│事务管理│ │资源管理│ │资源管理│
└───────┘ └──────┘ └──────┘
三大角色:
- TC (Transaction Coordinator): 事务协调器,维护全局和分支事务状态
- TM (Transaction Manager): 事务管理器,定义全局事务范围
- RM (Resource Manager): 资源管理器,管理分支事务
Seata 事务模式
1. AT 模式(自动补偿)
1
2
3
4
5
6
7
8
9
10
11
@GlobalTransactional
public void createOrder() {
// 创建订单
orderService.create();
// 扣减库存
stockService.deduct();
// 扣减余额
accountService.deduct();
}
特点:
- 无业务侵入,基于 SQL 解析
- 自动生成反向 SQL 进行补偿
- 适合大部分场景
2. TCC 模式
1
2
3
4
5
6
7
8
9
10
@LocalTCC
public interface StockService {
@TwoPhaseBusinessAction(name = "deductStock", commitMethod = "confirm", rollbackMethod = "cancel")
boolean prepare(@BusinessActionContextParameter(paramName = "stockId") String stockId,
@BusinessActionContextParameter(paramName = "count") int count);
boolean confirm(BusinessActionContext context);
boolean cancel(BusinessActionContext context);
}
3. SAGA 模式
通过状态机引擎编排事务流程。
4. XA 模式
基于数据库的 XA 协议,强一致性但性能较差。
Seata 配置示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
seata:
enabled: true
application-id: order-service
tx-service-group: my_test_tx_group
service:
vgroup-mapping:
my_test_tx_group: default
grouplist:
default: 127.0.0.1:8091
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: seata
group: SEATA_GROUP
registry:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: seata
group: SEATA_GROUP
DTM vs Seata 对比
| 特性 | DTM | Seata |
|---|---|---|
| 语言支持 | 多语言(Go、Python、PHP 等) | 主要 Java |
| 事务模式 | SAGA、TCC、XA、二阶段消息 | AT、TCC、SAGA、XA |
| 性能 | 高性能,Go 实现 | 较好,Java 实现 |
| 易用性 | API 简洁,学习曲线平缓 | 功能丰富,配置复杂 |
| 社区 | 活跃,文档完善 | 阿里支持,社区庞大 |
| 适用场景 | 多语言微服务 | Java 微服务生态 |
| 部署 | 轻量级,单机可运行 | 需要配置中心、注册中心 |
最佳实践
1. 选择合适的事务模式
- 强一致性要求: 使用 TCC 或 XA
- 最终一致性可接受: 使用 SAGA 或消息事务
- 性能优先: 使用 SAGA 或本地消息表
- 开发成本考虑: 使用 AT 模式(Seata)或 SAGA
2. 幂等性设计
所有分布式事务操作必须保证幂等性:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 使用唯一 ID 保证幂等
func DeductStock(orderId string, productId string, count int) error {
// 检查是否已执行
if IsProcessed(orderId) {
return nil
}
// 执行业务逻辑
err := doDeduct(productId, count)
if err != nil {
return err
}
// 记录执行状态
MarkProcessed(orderId)
return nil
}
3. 超时处理
设置合理的超时时间,避免资源长时间占用:
1
2
3
4
5
saga := dtmcli.NewSaga(dtmServer, gid).
SetOptions(&dtmcli.TransOptions{
TimeoutToFail: 60, // 60秒后失败
RetryInterval: 10, // 重试间隔10秒
})
4. 监控和告警
- 监控事务成功率
- 监控事务耗时
- 监控补偿操作执行情况
- 设置异常告警
5. 降级策略
当分布式事务框架不可用时:
- 使用本地事务 + 异步补偿
- 记录失败日志,人工介入
- 提供手动补偿接口
常见问题
1. 如何处理网络超时?
- 设置合理的超时时间
- 实现重试机制
- 确保操作幂等性
2. 如何保证补偿操作一定成功?
- 补偿操作要简单可靠
- 实现重试机制
- 记录失败日志,人工介入
3. 分布式事务性能如何优化?
- 减少事务参与方数量
- 异步化非核心操作
- 使用缓存减少数据库访问
- 选择合适的事务模式
4. 如何处理数据不一致?
- 实现对账系统
- 定期检查数据一致性
- 提供数据修复工具
总结
分布式事务是微服务架构中的重要组成部分,需要根据业务场景选择合适的解决方案:
- 强一致性场景: 使用 TCC 或 XA 模式
- 最终一致性场景: 使用 SAGA 或消息事务
- Java 生态: 优先考虑 Seata
- 多语言环境: 优先考虑 DTM
无论选择哪种方案,都需要注意:
- 保证操作幂等性
- 实现完善的监控和告警
- 设计降级和补偿策略
- 做好异常处理和日志记录