分布式事务解决方案:最佳实践
凌晨两点,咖啡已经凉了三杯,我的 VSCode 里还开着三个 terminal。一边在跑一个本地的 Seata demo,一边刷 LeetCode 上的 hard 题——别问我为什么这么拼,谁让我正在准备跳槽呢?白天要维护公司技术中台那堆“祖传代码”,晚上还得给自己充电,不然简历上就只有“参与分布式系统建设”这种模糊得不能再模糊的描述。
说到分布式系统,上周五上线前夜差点没把我送走。事情是这样的:我们团队负责的订单中心最近要做一次大重构,把原来单体架构拆成微服务。结果一测就炸了——用户付完钱,订单创建成功了,但库存没扣,优惠券也没用。测试妹子直接在群里@我:“你们这系统是想让用户白嫖吗?”
那一刻我真的想砸电脑。不是因为 bug 多难搞,而是因为——分布式事务这个老冤家,又来找我麻烦了。
为什么分布式事务这么让人头大?
先说背景。我们公司是一家电商上市公司,技术中台团队负责支撑所有业务线的通用能力,比如订单、支付、库存、用户中心等。以前大家挤在一个大单体里,数据库 ACID 保证得好好的,事务?那不就是 BEGIN / COMMIT 的事嘛。
但自从去年双11前老板拍板“全面微服务化”,我们的世界就变了。订单服务用 MySQL,库存服务用 PostgreSQL,优惠券服务甚至用了 MongoDB。跨服务、跨数据库,本地事务完全失效。更惨的是,产品经理还要求“下单必须原子成功”——要么全成功,要么全失败,不能半途而废。
于是,分布式事务成了每个后端工程师的噩梦。你可能会说:“上两阶段提交(2PC)不就行了?” 理论上没错,但实际用起来……emmm,性能差到连运维大哥都看不下去。
几种主流方案对比:别再只谈理论了
市面上常见的分布式事务方案有好几种,我挨个试过(或者说,被线上事故逼着试过),总结如下:
| 方案 | 原理 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 2PC / XA | 强一致性,协调者控制提交 | 数据强一致 | 性能差、阻塞、对 DB 侵入强 | 金融核心系统(容忍低并发) |
| TCC | Try-Confirm-Cancel 三阶段 | 灵活、性能较好 | 业务侵入极强,开发成本高 | 高一致性要求、可改造业务 |
| Saga | 事件驱动 + 补偿机制 | 异步、高吞吐 | 最终一致性,补偿逻辑复杂 | 长流程、允许短暂不一致 |
| 消息队列+本地消息表 | 利用 MQ 保证最终一致 | 实现简单、依赖现有组件 | 有延迟,需处理消息重复 | 中小公司、快速落地 |
我们团队一开始尝试了 TCC,结果写了一周代码,发现每个接口都要写 tryOrder, confirmOrder, cancelOrder 三个方法,产品经理看到 PR 直接问:“这确定不是为了凑 KPI?”
后来转向 基于消息队列的最终一致性方案,配合本地消息表,用 Python 实现,既满足了业务需求,又没让团队崩溃。下面我就重点聊聊这套方案的最佳实践。
我们的实战方案:Python + RabbitMQ + 本地消息表
核心思想
在本地事务中同时写业务数据和消息记录,再由后台任务异步发送消息到 MQ,下游消费后执行本地操作。
听起来很简单?但魔鬼在细节里。我们踩过的坑,你最好别再踩。
架构图(脑内想象版)
[Order Service]
│
├── 写订单表 (MySQL)
├── 写本地消息表 (message_outbox)
└── 提交本地事务
│
▼
[Message Relay Worker] ← 定时轮询 message_outbox
│
▼
[RabbitMQ] → [Inventory Service] → 扣库存
→ [Coupon Service] → 使用优惠券
关键点:本地事务包含业务 + 消息记录,确保“要么都写,要么都不写”。
关键代码实现(Python 版)
我们用的是 Flask + SQLAlchemy + RabbitMQ。别吐槽 Flask 不够“高大上”,在我们公司,能跑就行(狗头保命)。
1. 本地消息表设计
# models.py
class MessageOutbox(db.Model):
__tablename__ = 'message_outbox'
id = db.Column(db.Integer, primary_key=True)
event_type = db.Column(db.String(64), nullable=False) # e.g., 'order_created'
payload = db.Column(db.JSON, nullable=False) # 消息内容
status = db.Column(db.String(16), default='pending') # pending / sent / failed
created_at = db.Column(db.DateTime, default=datetime.utcnow)
sent_at = db.Column(db.DateTime, nullable=True)
2. 下单接口:业务 + 消息写入同一事务
# order_service.py
def create_order(user_id, items, coupon_id):
try:
# 开启事务
with db.session.begin():
# 1. 创建订单
order = Order(user_id=user_id, status='created')
db.session.add(order)
db.session.flush() # 获取 order.id
# 2. 写入本地消息表
message = MessageOutbox(
event_type='order_created',
payload={
'order_id': order.id,
'user_id': user_id,
'items': items,
'coupon_id': coupon_id
}
)
db.session.add(message)
# 注意:这里没有发消息!只是写库
return order.id
except Exception as e:
# 事务自动回滚
logger.error(f"Create order failed: {e}")
raise
为什么不在事务里直接发 MQ?
因为网络调用不可靠!如果 MQ 挂了,整个事务就得回滚,但订单可能已经扣了用户钱(假设支付是同步的),这就出大事了。所以只写本地表,不碰外部系统。
3. 消息中继服务(Relay Worker)
这是最容易出问题的地方。我们一开始用 time.sleep(1) 轮询,结果 CPU 占用爆了。后来改用 指数退避 + 批量处理。
# message_relay.py
import pika
import time
import random
def send_to_mq(message):
try:
connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
channel = connection.channel()
channel.basic_publish(
exchange='',
routing_key=message.event_type,
body=json.dumps(message.payload),
properties=pika.BasicProperties(delivery_mode=2) # 持久化
)
connection.close()
return True
except Exception as e:
logger.error(f"Send to MQ failed: {e}")
return False
def relay_messages():
while True:
try:
# 批量捞取 pending 消息
messages = MessageOutbox.query.filter_by(status='pending').limit(50).all()
for msg in messages:
if send_to_mq(msg):
msg.status = 'sent'
msg.sent_at = datetime.utcnow()
else:
# 失败次数太多?标记为 failed 人工介入
if get_retry_count(msg.id) > 5:
msg.status = 'failed'
else:
# 稍后再试(指数退避)
time.sleep(random.uniform(0.1, 2.0))
db.session.commit()
# 如果没消息,睡久一点省资源
if not messages:
time.sleep(5)
except Exception as e:
logger.exception("Relay worker error")
time.sleep(10)
Tips:
- 消息一定要持久化(
delivery_mode=2) - 处理失败要重试,但别无限重试
- 批量处理提升吞吐
4. 下游服务:幂等性是生命线!
库存服务收到 order_created 消息后,第一件事不是扣库存,而是检查是否处理过。
# inventory_service.py
@app.route('/consume/order_created', methods=['POST'])
def handle_order_created():
data = request.json
order_id = data['order_id']
# 幂等检查:用 order_id 作为唯一键
if is_order_processed(order_id):
return {'status': 'already_processed'}
try:
deduct_inventory(data['items'])
mark_order_as_processed(order_id) # 记录已处理
return {'status': 'ok'}
except InventoryNotEnough:
# 库存不足?发补偿消息(比如取消订单)
send_compensation_message(order_id, 'inventory_shortage')
return {'status': 'compensated'}
幂等性怎么做?
最简单:建一张 processed_events 表,存 (event_id, service_name)。每次消费前查一下。
GitHub 上有哪些好工具可以抄作业?
别 reinvent the wheel!我在准备跳槽刷题时,顺手扒拉了不少开源项目,推荐几个靠谱的:
| 项目 | 语言 | 特点 | 地址 |
|---|---|---|---|
| dtm | Go/多语言支持 | 支持 TCC/Saga/XA,文档超全 | https://github.com/dtm-labs/dtm |
| Eventuate Tram | Java | 基于事件和 CDC 的方案 | https://github.com/eventuate-tram/eventuate-tram-core |
| django-outbox-pattern | Python | Django 专用本地消息表实现 | https://github.com/vintasoftware/django-outbox-pattern |
虽然我们没直接用这些,但设计思路值得借鉴。特别是 dtm,如果你用 Go,闭眼冲就行。
生产环境血泪教训
消息重复是常态
RabbitMQ 在 ack 丢失时会重发。所以下游必须幂等,否则用户买一件商品,库存扣三次。监控比代码更重要
我们加了两个关键监控:message_outbox中pending消息堆积数failed消息告警(超过 3 条就钉钉轰炸)
补偿机制不能少
曾经因为库存服务宕机 2 小时,导致几百个订单卡住。后来加了定时任务:30 分钟未完成的订单自动取消,并退优惠券。别信“最终一致”等于“随便延迟”
用户付完钱,5 秒后发现订单还在“处理中”,就要骂娘了。我们优化后,95% 的消息在 200ms 内送达。
性能数据(真实上线后)
| 指标 | 优化前 | 优化后 |
|---|---|---|
| 平均下单耗时 | 1200ms | 380ms |
| 消息投递成功率 | 92% | 99.98% |
| 人工干预订单数/天 | 15+ | <1 |
关键优化点:
- 消息表加索引:
status + created_at - Relay Worker 改为多进程(Python GIL 你懂的)
- RabbitMQ 集群 + 镜像队列
最后:关于跳槽和成长
写这篇文章的时候,我又刷完一道 LeetCode。说实话,现在面试官动不动就问“你怎么保证分布式事务一致性”,光背八股文肯定不行。只有真正在线上被毒打过,才能讲出细节。
我们这套方案虽然不算最 fancy,但在 Python 技术栈 + 中等规模电商 场景下,稳定、可维护、易理解。毕竟,代码是给人看的,其次才是给机器跑的——这是我最近刷 Clean Code 时最大的感悟。
如果你也在被分布式事务折磨,不妨试试本地消息表 + MQ 的组合。它可能不够“学术”,但足够“实用”。
对了,我们团队还在招人(别问,问就是缺人)。如果你也喜欢深夜写代码、讨厌 PPT 工程师、并且能接受偶尔的线上救火……欢迎来聊。简历请附上 GitHub 链接,别只放课程设计,谢谢。
彩蛋:上周五上线后,测试妹子请我喝了杯瑞幸。她说:“这次终于没让我白嫖。”
我回她:“下次再出 bug,我就真让你白嫖了。”
—— 程序员的幽默,大概就是这么朴实无华。

评论 0