分布式事务解决方案:从“产品狗”到“码农”的血泪实战

代码洁癖患者
2025-12-14 12:05
阅读 395

上周五晚上九点半,我瘫在工位上,耳机里放着 Lo-fi Hip Hop(别笑,边听音乐边写代码真的能救命),盯着 K8s 集群里疯狂重启的 Pod,心里默默问候了产品经理祖宗十八代。事情起因很简单:我们新上的“会员积分+优惠券”联合营销活动,用户下单时要同时扣积分、发券、更新订单状态——三个服务,三个数据库,结果双11预热第一天就炸了。

作为那个曾经坐在会议室里画流程图、喊“这个需求很简单”的前产品经理,如今亲手被自己当年埋下的雷炸飞,只能说:因果循环,报应不爽。

但骂归骂,锅还得背。毕竟现在我是后端工程师,而且还是个自诩“懂产品”的技术人。今天这篇,就聊聊我们在北京这间加班文化浓厚的互联网公司里,如何用 Python + 云原生架构搞定分布式事务这个“烫手山芋”。


背景:不是所有“简单需求”都简单

事情发生在去年 Q3。产品团队拍脑袋决定搞个“积分抵现 + 自动发券”联动玩法,美其名曰“提升用户 LTV”。技术评审会上,我看着那张花里胡哨的流程图,冷汗直流:

  • 用户下单 → 扣减账户积分(积分服务)
  • 同时生成一张专属优惠券(营销服务)
  • 再更新订单状态为“已支付”(订单服务)

三个独立微服务,三个 MySQL 实例,跨库操作。更要命的是,不能出现“扣了积分但没发券”或者“发了券但订单失败” 的情况——否则用户投诉分分钟打爆客服电话。

测试同学当场灵魂拷问:“如果发券服务挂了,积分回滚吗?”
我弱弱回答:“呃……目前没有。”

那一刻,我知道,分布式事务这课,迟早得补。


方案选型:别被理论绕晕,先看落地成本

市面上分布式事务方案不少:2PC、TCC、Saga、本地消息表、Seata……听起来都很牛,但对我们这种小团队(5个后端 + 2个运维 + 我这个“半路出家”的),综合考量下来,必须满足几个现实条件:

  1. 学习曲线平缓:不能让我这种刚转码一年的人看三天文档还配不对
  2. 对现有代码侵入小:Python 项目已经跑了一年多,重构成本太高
  3. 云原生友好:我们全栈跑在 K8s 上,最好能和 Operator/Service Mesh 融合
  4. 有可观测性:线上出问题能快速定位,别让用户等三天才知道券没发

对比一圈,最终我们选择了 “可靠消息最终一致性” + 补偿机制 的组合拳。为什么?因为:

  • 不依赖全局锁,性能损耗小
  • 消息队列我们已经在用(Kafka)
  • 补偿逻辑可以复用现有的任务重试框架

当然,也有人推荐 Seata,但它的 AT 模式对 MySQL 有特定要求,且 Python 客户端生态不如 Java 成熟。作为务实派,我们选择了更“接地气”的方案。


实战:用 Python 实现可靠消息事务

核心思路就两点:

  1. 本地事务 + 消息落库:先在本地事务中完成业务操作,并将待发送的消息写入同一数据库的 outbox
  2. 异步投递 + 幂等消费:后台任务扫描 outbox,投递消息;消费者做幂等处理

第一步:改造订单服务(关键!)

# models.py
class Order(models.Model):
    user_id = models.IntegerField()
    status = models.CharField(max_length=20)
    # ...其他字段

class OutboxMessage(models.Model):
    topic = models.CharField(max_length=100)
    payload = models.JSONField()  # 存序列化后的消息体
    status = models.CharField(max_length=20, default="pending")  # pending / sent
    created_at = models.DateTimeField(auto_now_add=True)
# services/order_service.py
from django.db import transaction
import json

def create_order_with_points(user_id, amount):
    with transaction.atomic():
        # 1. 扣减积分(调用积分服务 API,但先不 commit)
        points_service.deduct_points(user_id, amount)

        # 2. 创建订单
        order = Order.objects.create(user_id=user_id, status="paid")

        # 3. 写入 outbox 消息(关键!和订单在同一事务)
        OutboxMessage.objects.create(
            topic="coupon.issue",
            payload={
                "user_id": user_id,
                "order_id": order.id,
                "coupon_type": "WELCOME_20"
            }
        )
        return order

注意:这里 points_service.deduct_points 实际是 HTTP 调用,但我们在积分服务端做了预占机制(类似 TCC 的 Try 阶段),只有收到确认才真正扣减。如果后续发券失败,我们会调用 /cancel 接口释放预占积分。

第二步:后台投递任务(跑在 Celery 里)

# tasks/outbox_publisher.py
from celery import shared_task
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['kafka:9092'])

@shared_task
def publish_outbox_messages():
    # 扫描 pending 消息,最多100条
    messages = OutboxMessage.objects.filter(status="pending")[:100]
    for msg in messages:
        try:
            producer.send(
                topic=msg.topic,
                value=json.dumps(msg.payload).encode('utf-8')
            )
            msg.status = "sent"
            msg.save(update_fields=["status"])
        except Exception as e:
            # 记录日志,下次重试
            logger.error(f"Failed to send message {msg.id}: {e}")
            # 不更新 status,下次继续重试

这个任务每 30 秒跑一次,配合 K8s 的 CronJob 部署,稳如老狗。

第三步:营销服务消费(务必幂等!)

# consumers/coupon_consumer.py
def on_coupon_issue_message(payload):
    user_id = payload["user_id"]
    order_id = payload["order_id"]

    # 幂等检查:是否已为该订单发过券?
    if Coupon.objects.filter(order_id=order_id).exists():
        return  # 直接返回,避免重复发券

    try:
        # 发券逻辑
        coupon = Coupon.objects.create(
            user_id=user_id,
            order_id=order_id,
            type=payload["coupon_type"],
            status="active"
        )
    except Exception as e:
        # 如果发券失败,触发补偿:通知积分服务取消预占
        points_service.cancel_deduction(user_id, order_id)
        raise e  # 让 Kafka 重试

运维与监控:别等用户投诉才救火

上线前,我们做了三件事:

  1. 埋点监控:在 Grafana 里建了个面板,实时看 outbox 表积压量。超过 100 条就告警。
  2. 人工干预接口:加了个管理后台,支持手动重发或标记失败消息。
  3. 混沌测试:故意 kill Kafka 或营销服务 Pod,验证补偿流程是否生效。
指标 正常值 告警阈值
Outbox 积压消息数 < 50 > 100
消息投递成功率 > 99.9% < 99%
补偿任务执行延迟 < 1min > 5min

事实证明,这套方案扛住了双11当天 5w+/h 的订单峰值,0 起资损事故。运维同学终于不用半夜被 PagerDuty 叫醒了(虽然他还在吐槽我写的日志不够结构化)。


心得:技术没有银弹,只有权衡

回头看,这个方案不是最“高级”的,但对我们团队来说是最合适的。作为前产品经理,我现在深刻理解了什么叫“技术债”——当年一句“很简单”,可能让现在的自己熬三个通宵。

如果你也在用 Python 搞微服务,我的建议是:

  • 别迷信新框架:Seata 很酷,但你的团队能驾驭吗?
  • 优先保证数据最终一致:强一致性往往代价太高
  • 补偿机制比回滚更实用:用户宁愿晚点拿到券,也不愿钱白扣

最后,感谢我的耳机和 Lo-fi 歌单,陪我度过了无数个 debug 的夜晚。也感谢那位“简单需求”的产品经理——虽然我已经把他微信拉黑了 😏

技术人的成长,就是不断填自己或别人挖的坑。但只要系统稳了,用户笑了,这班加得就值。

(完)

评论 0

最热最新
暂无评论
匿名用户Lv.1
0
影响力
0
文章
0
粉丝