分布式事务解决方案:最佳实践 —— 一个传统企业 Java 程序员的血泪总结

罗华
2025-12-19 06:04
阅读 925

大家好,我是老李,目前在一家传统制造业企业的 IT 部门干 Java 开发,快两年了。公司正在搞“数字化转型”这种高大上的词儿,说白了就是把老旧系统慢慢换成微服务架构。我日常主要负责订单、库存、财务这几个核心模块的对接,顺便研究开源项目源码解闷(毕竟我们用的很多中间件都是 GitHub 上扒下来的)。最近还迷上了前端动画——别笑,写完一堆 synchronized@Transactional 后,看到 CSS 动画丝滑过渡真的能治 PTSD。

上周五晚上九点,我在工位上啃着冷掉的黄焖鸡,盯着屏幕上一串报错:

org.springframework.transaction.CannotCreateTransactionException: 
Could not create local transaction - nested exception is ...

那一刻,我真的想砸电脑。

这事得从我们去年双11说起。


背景:一场“跨系统”的灾难

我们公司原本是单体应用,所有业务都在一个 Oracle 数据库里跑。去年领导拍板要做微服务拆分,订单、库存、账务各自独立部署,数据库也拆了。听起来很美好,但问题很快就来了。

比如用户下单:

  1. 订单服务 创建订单(写自己的 MySQL)
  2. 库存服务 扣减库存(另一个 MySQL)
  3. 账务服务 冻结金额(又一个 MySQL)

这三个操作必须全部成功或全部失败,否则就会出现“钱扣了但没发货”或者“货发了但没扣钱”这种社死现场。产品经理当时轻飘飘一句:“你们后端自己保证数据一致性呗”,我差点当场表演一个原地升天。

更坑的是,我们团队里还有个 Go 语言爱好者(运维组借调过来的),天天安利用 Go 写服务性能多高、goroutine 多牛,甚至偷偷在测试环境搭了个基于 dapr 的原型。虽然我内心抗拒(Java 老狗不想学新语言),但也不得不承认:分布式事务这事儿,真不能靠 @Transactional 硬扛了。


踩坑之路:从 2PC 到最终一致性

一开始,我们天真地尝试了 XA 两阶段提交(2PC)。Spring Boot + Atomikos 搞起来很快,本地测试也没问题。结果一上预发环境,TPS 直接掉到个位数。运维小哥一脸嫌弃:“你们这数据库连接池都占满了,其他服务连不上了!”

查了下资料才明白:2PC 在协调阶段会锁住资源,高并发下简直就是性能杀手。而且我们用的 MySQL 8.0 虽然支持 XA,但实际线上一旦某个节点超时,整个事务就卡住,还得人工介入回滚——这谁顶得住?

被领导约谈三次后,我们痛定思痛,转向 最终一致性方案。主流有三种:TCC、Saga、可靠消息(MQ)。

  • TCC:需要每个服务提供 Try/Confirm/Cancel 三个接口,改造成本太高,库存那边直接拒绝。
  • Saga:适合长流程,但我们下单就三步,太重了。
  • 可靠消息:用 RocketMQ 的事务消息,看起来最轻量。

于是,我们选了 RocketMQ 事务消息 + 本地事务表 这套组合拳。


实战:用 RocketMQ 事务消息搞定下单流程

架构设计

用户请求 → 订单服务(创建订单 + 发送半消息) 
               ↓
       RocketMQ Broker(半消息暂存)
               ↓
订单服务执行本地事务(写订单表 + 本地事务日志)
               ↓
根据本地事务结果,Commit 或 Rollback 消息
               ↓
库存服务消费消息 → 扣库存 → 发送确认消息
               ↓
账务服务消费确认消息 → 冻结金额

关键点在于:本地事务和消息发送必须原子。我们用了 RocketMQ 提供的 TransactionListener 接口。

核心代码

订单服务中,发送事务消息的部分:

// OrderService.java
public void createOrder(OrderDTO dto) {
    TransactionMQProducer producer = rocketMQTemplate.getProducer();
    
    Message<String> msg = new Message<>("ORDER_TOPIC", JSON.toJSONString(dto).getBytes());
    
    TransactionSendResult sendResult = producer.sendMessageInTransaction(
        msg, 
        new OrderTransactionListener(), // 自定义监听器
        null
    );
}

自定义的 TransactionListener

public class OrderTransactionListener implements TransactionListener {

    @Autowired
    private OrderRepository orderRepo;

    @Autowired
    private TransactionLogRepository logRepo;

    // 执行本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 1. 写订单
            Order order = buildOrderFromMsg(msg);
            orderRepo.save(order);
            
            // 2. 写事务日志(状态为 PENDING)
            TransactionLog log = new TransactionLog();
            log.setBizId(order.getId());
            log.setStatus("PENDING");
            logRepo.save(log);
            
            return LocalTransactionState.COMMIT_MESSAGE; // 提交消息
            
        } catch (Exception e) {
            log.error("本地事务失败", e);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    // 回查机制(Broker 不确定事务状态时调用)
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String bizId = extractBizId(msg);
        TransactionLog log = logRepo.findByBizId(bizId);
        
        if (log == null) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        
        return "COMMITTED".equals(log.getStatus()) 
            ? LocalTransactionState.COMMIT_MESSAGE 
            : LocalTransactionState.UNKNOW;
    }
}

💡 为什么要有事务日志表?
因为 checkLocalTransaction 可能被多次调用(网络抖动、Broker 重启等),必须有个幂等依据。日志表就是我们的“事实来源”。

库存服务那边就简单多了,普通消费者:

@RocketMQMessageListener(topic = "ORDER_TOPIC", consumerGroup = "inventory-group")
public class InventoryConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        try {
            OrderDTO dto = JSON.parseObject(message, OrderDTO.class);
            inventoryService.decrease(dto.getSkuId(), dto.getQuantity());
            
            // 发送确认消息给账务
            rocketMQTemplate.convertAndSend("ACCOUNT_TOPIC", dto);
            
        } catch (Exception e) {
            // 记录失败日志,后续人工补偿 or 定时重试
            compensationService.record(dto, "INVENTORY_DECREASE_FAILED");
            throw new RuntimeException(e); // 触发消息重试
        }
    }
}

踩过的坑 & 生产经验

1. 消息重复消费?幂等性是底线!

RocketMQ 保证至少一次投递,所以消费者必须做幂等。我们在库存服务加了去重表:

CREATE TABLE inventory_dedup (
    order_id VARCHAR(64) PRIMARY KEY,
    created_at TIMESTAMP
);

每次扣库存前先 INSERT IGNORE,失败就说明处理过了。

2. 补偿机制不能少

再完美的设计也会翻车。我们做了三层兜底:

  • 自动重试:RocketMQ 默认 16 次重试(间隔递增)
  • 定时任务扫描:每 5 分钟查 transaction_log 中超过 10 分钟还是 PENDING 的记录,触发人工审核
  • 人工干预后台:运维可以直接点击“强制回滚”或“强制确认”

3. 监控告警要跟上

在 Grafana 上配了几个关键指标:

  • 事务消息积压量
  • 本地事务失败率
  • 补偿任务执行次数

一旦异常,钉钉机器人立刻 @ 全体后端。上周就有一次因为数据库主从延迟导致库存扣超了,幸好监控及时发现。


性能对比:2PC vs 事务消息

方案 TPS(4核8G) 数据一致性 改造成本 运维复杂度
XA 2PC ~15 强一致 高(需 DBA 配合)
RocketMQ 事务消息 ~800 最终一致
TCC ~1200 最终一致

注:测试环境为 3 节点 MySQL 8.0 + RocketMQ 5.0,模拟 100 并发下单

明显看出,事务消息在性能和落地成本上更平衡。虽然不是强一致,但在我们业务场景下(用户容忍秒级延迟),完全够用。


关于 Go 和 GitHub 的碎碎念

说到 Go,其实我们后来真用它写了个补偿任务调度器。Go 的 goroutine + channel 处理定时任务确实优雅,代码量比 Java 少一半。项目扔在公司私有 GitLab 上了,但核心思路参考了 GitHub 上一个叫 dtm 的开源项目——一个 Go 写的分布式事务框架,Star 快 20k 了,挺火。

不过作为 Java 老狗,我还是觉得:工具不重要,思路才重要。无论你用 Spring Cloud Alibaba 的 Seata,还是 Go 的 dtm,本质都是解决“跨系统原子操作”这个千年难题。


最后:心得体会

搞完这套方案,我最大的感悟是:

分布式事务没有银弹,只有“适合业务”的权衡。

我们放弃了强一致,换来了可维护性和性能;牺牲了部分实时性,保住了系统稳定性。上线三个月,0 起资损事故,双11峰值 TPS 600+,领导终于不再问“为啥不能像淘宝那样秒杀”了(笑)。

如果你也在传统企业搞数字化转型,别一上来就想上 Seata 或 Saga。先问问业务:到底能不能容忍短暂的不一致? 如果能,可靠消息+本地事务表,真香。

对了,代码已脱敏上传公司内部 GitHub(其实是 GitLab),欢迎 fork……哦不对,我们用的是 SVN。唉,传统企业的痛,谁懂。


彩蛋:上周团建,产品经理敬我酒说:“老李,你们后端现在稳得很啊!”
我回他:“下次改需求前,先请我喝杯星巴克。”
他说:“行,拿券吧——电子券,有效期 5 分钟,过期不候。”
……这 TM 是不是在嘲讽我们的最终一致性? 😅

评论 0

最热最新
暂无评论
匿名用户Lv.1
0
影响力
0
文章
0
粉丝