分布式事务解决方案:从“产品狗”到“码农”的血泪实战
上周五晚上九点半,我瘫在工位上,耳机里放着 Lo-fi Hip Hop(别笑,边听音乐边写代码真的能救命),盯着 K8s 集群里疯狂重启的 Pod,心里默默问候了产品经理祖宗十八代。事情起因很简单:我们新上的“会员积分+优惠券”联合营销活动,用户下单时要同时扣积分、发券、更新订单状态——三个服务,三个数据库,结果双11预热第一天就炸了。
作为那个曾经坐在会议室里画流程图、喊“这个需求很简单”的前产品经理,如今亲手被自己当年埋下的雷炸飞,只能说:因果循环,报应不爽。
但骂归骂,锅还得背。毕竟现在我是后端工程师,而且还是个自诩“懂产品”的技术人。今天这篇,就聊聊我们在北京这间加班文化浓厚的互联网公司里,如何用 Python + 云原生架构搞定分布式事务这个“烫手山芋”。
背景:不是所有“简单需求”都简单
事情发生在去年 Q3。产品团队拍脑袋决定搞个“积分抵现 + 自动发券”联动玩法,美其名曰“提升用户 LTV”。技术评审会上,我看着那张花里胡哨的流程图,冷汗直流:
- 用户下单 → 扣减账户积分(积分服务)
- 同时生成一张专属优惠券(营销服务)
- 再更新订单状态为“已支付”(订单服务)
三个独立微服务,三个 MySQL 实例,跨库操作。更要命的是,不能出现“扣了积分但没发券”或者“发了券但订单失败” 的情况——否则用户投诉分分钟打爆客服电话。
测试同学当场灵魂拷问:“如果发券服务挂了,积分回滚吗?”
我弱弱回答:“呃……目前没有。”
那一刻,我知道,分布式事务这课,迟早得补。
方案选型:别被理论绕晕,先看落地成本
市面上分布式事务方案不少:2PC、TCC、Saga、本地消息表、Seata……听起来都很牛,但对我们这种小团队(5个后端 + 2个运维 + 我这个“半路出家”的),综合考量下来,必须满足几个现实条件:
- 学习曲线平缓:不能让我这种刚转码一年的人看三天文档还配不对
- 对现有代码侵入小:Python 项目已经跑了一年多,重构成本太高
- 云原生友好:我们全栈跑在 K8s 上,最好能和 Operator/Service Mesh 融合
- 有可观测性:线上出问题能快速定位,别让用户等三天才知道券没发
对比一圈,最终我们选择了 “可靠消息最终一致性” + 补偿机制 的组合拳。为什么?因为:
- 不依赖全局锁,性能损耗小
- 消息队列我们已经在用(Kafka)
- 补偿逻辑可以复用现有的任务重试框架
当然,也有人推荐 Seata,但它的 AT 模式对 MySQL 有特定要求,且 Python 客户端生态不如 Java 成熟。作为务实派,我们选择了更“接地气”的方案。
实战:用 Python 实现可靠消息事务
核心思路就两点:
- 本地事务 + 消息落库:先在本地事务中完成业务操作,并将待发送的消息写入同一数据库的
outbox表 - 异步投递 + 幂等消费:后台任务扫描
outbox,投递消息;消费者做幂等处理
第一步:改造订单服务(关键!)
# models.py
class Order(models.Model):
user_id = models.IntegerField()
status = models.CharField(max_length=20)
# ...其他字段
class OutboxMessage(models.Model):
topic = models.CharField(max_length=100)
payload = models.JSONField() # 存序列化后的消息体
status = models.CharField(max_length=20, default="pending") # pending / sent
created_at = models.DateTimeField(auto_now_add=True)
# services/order_service.py
from django.db import transaction
import json
def create_order_with_points(user_id, amount):
with transaction.atomic():
# 1. 扣减积分(调用积分服务 API,但先不 commit)
points_service.deduct_points(user_id, amount)
# 2. 创建订单
order = Order.objects.create(user_id=user_id, status="paid")
# 3. 写入 outbox 消息(关键!和订单在同一事务)
OutboxMessage.objects.create(
topic="coupon.issue",
payload={
"user_id": user_id,
"order_id": order.id,
"coupon_type": "WELCOME_20"
}
)
return order
注意:这里 points_service.deduct_points 实际是 HTTP 调用,但我们在积分服务端做了预占机制(类似 TCC 的 Try 阶段),只有收到确认才真正扣减。如果后续发券失败,我们会调用 /cancel 接口释放预占积分。
第二步:后台投递任务(跑在 Celery 里)
# tasks/outbox_publisher.py
from celery import shared_task
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['kafka:9092'])
@shared_task
def publish_outbox_messages():
# 扫描 pending 消息,最多100条
messages = OutboxMessage.objects.filter(status="pending")[:100]
for msg in messages:
try:
producer.send(
topic=msg.topic,
value=json.dumps(msg.payload).encode('utf-8')
)
msg.status = "sent"
msg.save(update_fields=["status"])
except Exception as e:
# 记录日志,下次重试
logger.error(f"Failed to send message {msg.id}: {e}")
# 不更新 status,下次继续重试
这个任务每 30 秒跑一次,配合 K8s 的 CronJob 部署,稳如老狗。
第三步:营销服务消费(务必幂等!)
# consumers/coupon_consumer.py
def on_coupon_issue_message(payload):
user_id = payload["user_id"]
order_id = payload["order_id"]
# 幂等检查:是否已为该订单发过券?
if Coupon.objects.filter(order_id=order_id).exists():
return # 直接返回,避免重复发券
try:
# 发券逻辑
coupon = Coupon.objects.create(
user_id=user_id,
order_id=order_id,
type=payload["coupon_type"],
status="active"
)
except Exception as e:
# 如果发券失败,触发补偿:通知积分服务取消预占
points_service.cancel_deduction(user_id, order_id)
raise e # 让 Kafka 重试
运维与监控:别等用户投诉才救火
上线前,我们做了三件事:
- 埋点监控:在 Grafana 里建了个面板,实时看
outbox表积压量。超过 100 条就告警。 - 人工干预接口:加了个管理后台,支持手动重发或标记失败消息。
- 混沌测试:故意 kill Kafka 或营销服务 Pod,验证补偿流程是否生效。
| 指标 | 正常值 | 告警阈值 |
|---|---|---|
| Outbox 积压消息数 | < 50 | > 100 |
| 消息投递成功率 | > 99.9% | < 99% |
| 补偿任务执行延迟 | < 1min | > 5min |
事实证明,这套方案扛住了双11当天 5w+/h 的订单峰值,0 起资损事故。运维同学终于不用半夜被 PagerDuty 叫醒了(虽然他还在吐槽我写的日志不够结构化)。
心得:技术没有银弹,只有权衡
回头看,这个方案不是最“高级”的,但对我们团队来说是最合适的。作为前产品经理,我现在深刻理解了什么叫“技术债”——当年一句“很简单”,可能让现在的自己熬三个通宵。
如果你也在用 Python 搞微服务,我的建议是:
- 别迷信新框架:Seata 很酷,但你的团队能驾驭吗?
- 优先保证数据最终一致:强一致性往往代价太高
- 补偿机制比回滚更实用:用户宁愿晚点拿到券,也不愿钱白扣
最后,感谢我的耳机和 Lo-fi 歌单,陪我度过了无数个 debug 的夜晚。也感谢那位“简单需求”的产品经理——虽然我已经把他微信拉黑了 😏
技术人的成长,就是不断填自己或别人挖的坑。但只要系统稳了,用户笑了,这班加得就值。
(完)

评论 0