分布式事务解决方案:最佳实践

技术_宋玉_工程师
2025-12-16 05:24
阅读 779

凌晨两点,咖啡已经凉了三杯,我的 VSCode 里还开着三个 terminal。一边在跑一个本地的 Seata demo,一边刷 LeetCode 上的 hard 题——别问我为什么这么拼,谁让我正在准备跳槽呢?白天要维护公司技术中台那堆“祖传代码”,晚上还得给自己充电,不然简历上就只有“参与分布式系统建设”这种模糊得不能再模糊的描述。

说到分布式系统,上周五上线前夜差点没把我送走。事情是这样的:我们团队负责的订单中心最近要做一次大重构,把原来单体架构拆成微服务。结果一测就炸了——用户付完钱,订单创建成功了,但库存没扣,优惠券也没用。测试妹子直接在群里@我:“你们这系统是想让用户白嫖吗?”

那一刻我真的想砸电脑。不是因为 bug 多难搞,而是因为——分布式事务这个老冤家,又来找我麻烦了。


为什么分布式事务这么让人头大?

先说背景。我们公司是一家电商上市公司,技术中台团队负责支撑所有业务线的通用能力,比如订单、支付、库存、用户中心等。以前大家挤在一个大单体里,数据库 ACID 保证得好好的,事务?那不就是 BEGIN / COMMIT 的事嘛。

但自从去年双11前老板拍板“全面微服务化”,我们的世界就变了。订单服务用 MySQL,库存服务用 PostgreSQL,优惠券服务甚至用了 MongoDB。跨服务、跨数据库,本地事务完全失效。更惨的是,产品经理还要求“下单必须原子成功”——要么全成功,要么全失败,不能半途而废。

于是,分布式事务成了每个后端工程师的噩梦。你可能会说:“上两阶段提交(2PC)不就行了?” 理论上没错,但实际用起来……emmm,性能差到连运维大哥都看不下去。


几种主流方案对比:别再只谈理论了

市面上常见的分布式事务方案有好几种,我挨个试过(或者说,被线上事故逼着试过),总结如下:

方案 原理 优点 缺点 适用场景
2PC / XA 强一致性,协调者控制提交 数据强一致 性能差、阻塞、对 DB 侵入强 金融核心系统(容忍低并发)
TCC Try-Confirm-Cancel 三阶段 灵活、性能较好 业务侵入极强,开发成本高 高一致性要求、可改造业务
Saga 事件驱动 + 补偿机制 异步、高吞吐 最终一致性,补偿逻辑复杂 长流程、允许短暂不一致
消息队列+本地消息表 利用 MQ 保证最终一致 实现简单、依赖现有组件 有延迟,需处理消息重复 中小公司、快速落地

我们团队一开始尝试了 TCC,结果写了一周代码,发现每个接口都要写 tryOrder, confirmOrder, cancelOrder 三个方法,产品经理看到 PR 直接问:“这确定不是为了凑 KPI?”

后来转向 基于消息队列的最终一致性方案,配合本地消息表,用 Python 实现,既满足了业务需求,又没让团队崩溃。下面我就重点聊聊这套方案的最佳实践。


我们的实战方案:Python + RabbitMQ + 本地消息表

核心思想

在本地事务中同时写业务数据和消息记录,再由后台任务异步发送消息到 MQ,下游消费后执行本地操作。

听起来很简单?但魔鬼在细节里。我们踩过的坑,你最好别再踩。

架构图(脑内想象版)

[Order Service] 
   │
   ├── 写订单表 (MySQL)
   ├── 写本地消息表 (message_outbox)
   └── 提交本地事务
         │
         ▼
[Message Relay Worker] ← 定时轮询 message_outbox
         │
         ▼
[RabbitMQ] → [Inventory Service] → 扣库存
            → [Coupon Service]    → 使用优惠券

关键点:本地事务包含业务 + 消息记录,确保“要么都写,要么都不写”。


关键代码实现(Python 版)

我们用的是 Flask + SQLAlchemy + RabbitMQ。别吐槽 Flask 不够“高大上”,在我们公司,能跑就行(狗头保命)。

1. 本地消息表设计

# models.py
class MessageOutbox(db.Model):
    __tablename__ = 'message_outbox'
    
    id = db.Column(db.Integer, primary_key=True)
    event_type = db.Column(db.String(64), nullable=False)  # e.g., 'order_created'
    payload = db.Column(db.JSON, nullable=False)           # 消息内容
    status = db.Column(db.String(16), default='pending')  # pending / sent / failed
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    sent_at = db.Column(db.DateTime, nullable=True)

2. 下单接口:业务 + 消息写入同一事务

# order_service.py
def create_order(user_id, items, coupon_id):
    try:
        # 开启事务
        with db.session.begin():
            # 1. 创建订单
            order = Order(user_id=user_id, status='created')
            db.session.add(order)
            db.session.flush()  # 获取 order.id

            # 2. 写入本地消息表
            message = MessageOutbox(
                event_type='order_created',
                payload={
                    'order_id': order.id,
                    'user_id': user_id,
                    'items': items,
                    'coupon_id': coupon_id
                }
            )
            db.session.add(message)

        # 注意:这里没有发消息!只是写库
        return order.id

    except Exception as e:
        # 事务自动回滚
        logger.error(f"Create order failed: {e}")
        raise

为什么不在事务里直接发 MQ?
因为网络调用不可靠!如果 MQ 挂了,整个事务就得回滚,但订单可能已经扣了用户钱(假设支付是同步的),这就出大事了。所以只写本地表,不碰外部系统


3. 消息中继服务(Relay Worker)

这是最容易出问题的地方。我们一开始用 time.sleep(1) 轮询,结果 CPU 占用爆了。后来改用 指数退避 + 批量处理

# message_relay.py
import pika
import time
import random

def send_to_mq(message):
    try:
        connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
        channel = connection.channel()
        channel.basic_publish(
            exchange='',
            routing_key=message.event_type,
            body=json.dumps(message.payload),
            properties=pika.BasicProperties(delivery_mode=2)  # 持久化
        )
        connection.close()
        return True
    except Exception as e:
        logger.error(f"Send to MQ failed: {e}")
        return False

def relay_messages():
    while True:
        try:
            # 批量捞取 pending 消息
            messages = MessageOutbox.query.filter_by(status='pending').limit(50).all()
            
            for msg in messages:
                if send_to_mq(msg):
                    msg.status = 'sent'
                    msg.sent_at = datetime.utcnow()
                else:
                    # 失败次数太多?标记为 failed 人工介入
                    if get_retry_count(msg.id) > 5:
                        msg.status = 'failed'
                    else:
                        # 稍后再试(指数退避)
                        time.sleep(random.uniform(0.1, 2.0))
            
            db.session.commit()
            
            # 如果没消息,睡久一点省资源
            if not messages:
                time.sleep(5)
                
        except Exception as e:
            logger.exception("Relay worker error")
            time.sleep(10)

Tips:

  • 消息一定要持久化(delivery_mode=2
  • 处理失败要重试,但别无限重试
  • 批量处理提升吞吐

4. 下游服务:幂等性是生命线!

库存服务收到 order_created 消息后,第一件事不是扣库存,而是检查是否处理过

# inventory_service.py
@app.route('/consume/order_created', methods=['POST'])
def handle_order_created():
    data = request.json
    order_id = data['order_id']
    
    # 幂等检查:用 order_id 作为唯一键
    if is_order_processed(order_id):
        return {'status': 'already_processed'}
    
    try:
        deduct_inventory(data['items'])
        mark_order_as_processed(order_id)  # 记录已处理
        return {'status': 'ok'}
    except InventoryNotEnough:
        # 库存不足?发补偿消息(比如取消订单)
        send_compensation_message(order_id, 'inventory_shortage')
        return {'status': 'compensated'}

幂等性怎么做?
最简单:建一张 processed_events 表,存 (event_id, service_name)。每次消费前查一下。


GitHub 上有哪些好工具可以抄作业?

别 reinvent the wheel!我在准备跳槽刷题时,顺手扒拉了不少开源项目,推荐几个靠谱的:

项目 语言 特点 地址
dtm Go/多语言支持 支持 TCC/Saga/XA,文档超全 https://github.com/dtm-labs/dtm
Eventuate Tram Java 基于事件和 CDC 的方案 https://github.com/eventuate-tram/eventuate-tram-core
django-outbox-pattern Python Django 专用本地消息表实现 https://github.com/vintasoftware/django-outbox-pattern

虽然我们没直接用这些,但设计思路值得借鉴。特别是 dtm,如果你用 Go,闭眼冲就行。


生产环境血泪教训

  1. 消息重复是常态
    RabbitMQ 在 ack 丢失时会重发。所以下游必须幂等,否则用户买一件商品,库存扣三次。

  2. 监控比代码更重要
    我们加了两个关键监控:

    • message_outboxpending 消息堆积数
    • failed 消息告警(超过 3 条就钉钉轰炸)
  3. 补偿机制不能少
    曾经因为库存服务宕机 2 小时,导致几百个订单卡住。后来加了定时任务:30 分钟未完成的订单自动取消,并退优惠券。

  4. 别信“最终一致”等于“随便延迟”
    用户付完钱,5 秒后发现订单还在“处理中”,就要骂娘了。我们优化后,95% 的消息在 200ms 内送达。


性能数据(真实上线后)

指标 优化前 优化后
平均下单耗时 1200ms 380ms
消息投递成功率 92% 99.98%
人工干预订单数/天 15+ <1

关键优化点:

  • 消息表加索引:status + created_at
  • Relay Worker 改为多进程(Python GIL 你懂的)
  • RabbitMQ 集群 + 镜像队列

最后:关于跳槽和成长

写这篇文章的时候,我又刷完一道 LeetCode。说实话,现在面试官动不动就问“你怎么保证分布式事务一致性”,光背八股文肯定不行。只有真正在线上被毒打过,才能讲出细节

我们这套方案虽然不算最 fancy,但在 Python 技术栈 + 中等规模电商 场景下,稳定、可维护、易理解。毕竟,代码是给人看的,其次才是给机器跑的——这是我最近刷 Clean Code 时最大的感悟。

如果你也在被分布式事务折磨,不妨试试本地消息表 + MQ 的组合。它可能不够“学术”,但足够“实用”。

对了,我们团队还在招人(别问,问就是缺人)。如果你也喜欢深夜写代码、讨厌 PPT 工程师、并且能接受偶尔的线上救火……欢迎来聊。简历请附上 GitHub 链接,别只放课程设计,谢谢。


彩蛋:上周五上线后,测试妹子请我喝了杯瑞幸。她说:“这次终于没让我白嫖。”
我回她:“下次再出 bug,我就真让你白嫖了。”
—— 程序员的幽默,大概就是这么朴实无华。

评论 0

最热最新
暂无评论
匿名用户Lv.1
0
影响力
0
文章
0
粉丝