分布式事务太难?用 Python 手把手带你搞定最佳实践

乐观锁玩家
2025-12-24 22:27
阅读 348

大家好,我是一名工作 5 年的后端开发工程师,也带过不少新人。最近面试时发现,很多同学对“分布式事务”这个词一脸懵——既说不清它是什么,也不知道怎么解决。其实我当初学的时候也卡在这儿很久,翻文档、看论文、试错无数次,才慢慢搞明白。

今天这篇教程,就是想用最直白的语言 + 最简单的 Python 代码,带你从零理解分布式事务的核心思想和实战方案。无论你是刚学编程的小白,还是准备面试的求职者,都能跟着一步步动手做出来。

💡 本文特别适合:

  • 完全没接触过分布式系统的新手
  • 正在准备后端/架构类面试题(比如“如何保证跨服务数据一致性?”)
  • 想用 Python 实践分布式事务的同学

一、什么是分布式事务?为什么需要它?

先别被“事务”吓到!你可以把它想象成“一组操作必须全部成功,或者全部失败”。

单机事务 vs 分布式事务

  • 单机事务(比如 MySQL 的 BEGIN...COMMIT):所有操作都在同一个数据库里,很容易保证一致性。
  • 分布式事务:你的系统拆成了多个服务(比如用户服务、订单服务、库存服务),每个服务有自己的数据库。这时候,一个业务操作可能要同时改多个库——万一中间某个步骤失败了,其他库已经改了,数据就乱了!

举个栗子🌰:

用户下单 → 扣库存 + 创建订单 + 扣余额
如果“扣库存”成功了,但“创建订单”失败了,那库存白白少了,钱也没收,老板要哭了 😭

所以,分布式事务的目标就是:让跨服务的操作像单机事务一样可靠


二、环境准备:3 分钟搭好 Python 开发环境

我们不需要复杂的中间件,只用 Python + 两个 SQLite 数据库(模拟两个服务),就能演示核心逻辑。

所需工具

工具 版本 说明
Python ≥3.8 推荐 3.10+
pip 自带 包管理器
sqlite3 自带 轻量级数据库

安装依赖

pip install flask sqlalchemy

✅ Flask 用来模拟两个微服务
✅ SQLAlchemy 是 ORM,方便操作数据库

创建项目结构

mkdir distributed-tx-demo
cd distributed-tx-demo
touch order_service.py inventory_service.py main.py

三、核心概念:三大主流方案一句话讲清

分布式事务有几种经典解法,新手最容易混淆。我用大白话总结:

方案 核心思想 适用场景 缺点
2PC(两阶段提交) “先问大家能不能提交,都同意再真提交” 强一致性要求高 性能差、阻塞风险
TCC(Try-Confirm-Cancel) “先冻结资源,再确认或回滚” 金融、支付等关键业务 开发复杂度高
Saga 模式 “每个操作配一个补偿操作,失败就挨个回滚” 长流程、最终一致性 不能保证隔离性

🎯 对于大多数互联网应用(比如电商、社交),Saga 是最实用的选择,也是我们今天要实战的方案!


四、实战:用 Python 实现一个 Saga 分布式事务

我们要实现一个简化版的“下单”流程:

  1. 调用库存服务 → 扣减商品数量
  2. 调用订单服务 → 创建订单

如果第 2 步失败,就调用库存服务的“补偿接口”把库存加回去。

第一步:写库存服务(inventory_service.py)

from flask import Flask, request, jsonify
import sqlite3

app = Flask(__name__)

# 初始化库存数据库
def init_db():
    conn = sqlite3.connect('inventory.db')
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS products (
            id INTEGER PRIMARY KEY,
            stock INTEGER
        )
    ''')
    # 初始库存:商品1 有 100 件
    cursor.execute("INSERT OR IGNORE INTO products (id, stock) VALUES (1, 100)")
    conn.commit()
    conn.close()

@app.route('/deduct', methods=['POST'])
def deduct_stock():
    data = request.json
    product_id = data['product_id']
    quantity = data['quantity']
    
    conn = sqlite3.connect('inventory.db')
    cursor = conn.cursor()
    cursor.execute("UPDATE products SET stock = stock - ? WHERE id = ?", (quantity, product_id))
    conn.commit()
    conn.close()
    return jsonify({"status": "success"})

@app.route('/compensate', methods=['POST'])
def compensate_stock():
    """补偿操作:把库存加回去"""
    data = request.json
    product_id = data['product_id']
    quantity = data['quantity']
    
    conn = sqlite3.connect('inventory.db')
    cursor = conn.cursor()
    cursor.execute("UPDATE products SET stock = stock + ? WHERE id = ?", (quantity, product_id))
    conn.commit()
    conn.close()
    return jsonify({"status": "compensated"})

if __name__ == '__main__':
    init_db()
    app.run(port=5001)

第二步:写订单服务(order_service.py)

from flask import Flask, request, jsonify
import sqlite3
import random  # 模拟偶尔失败

app = Flask(__name__)

def init_db():
    conn = sqlite3.connect('orders.db')
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS orders (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            user_id INTEGER,
            product_id INTEGER
        )
    ''')
    conn.commit()
    conn.close()

@app.route('/create', methods=['POST'])
def create_order():
    # 模拟 30% 概率失败(测试补偿逻辑)
    if random.random() < 0.3:
        return jsonify({"error": "Order creation failed!"}), 500
    
    data = request.json
    user_id = data['user_id']
    product_id = data['product_id']
    
    conn = sqlite3.connect('orders.db')
    cursor = conn.cursor()
    cursor.execute("INSERT INTO orders (user_id, product_id) VALUES (?, ?)", (user_id, product_id))
    conn.commit()
    conn.close()
    return jsonify({"status": "order created"})

if __name__ == '__main__':
    init_db()
    app.run(port=5002)

第三步:主协调器(main.py)—— Saga 的大脑

import requests
import time

def place_order(user_id, product_id, quantity):
    print("🚀 开始下单流程...")
    
    # Step 1: 扣库存
    try:
        resp = requests.post('http://localhost:5001/deduct', json={
            'product_id': product_id,
            'quantity': quantity
        })
        resp.raise_for_status()
        print("✅ 库存已扣减")
    except Exception as e:
        print(f"❌ 扣库存失败: {e}")
        return False

    # Step 2: 创建订单
    try:
        resp = requests.post('http://localhost:5002/create', json={
            'user_id': user_id,
            'product_id': product_id
        })
        resp.raise_for_status()
        print("✅ 订单创建成功!")
        return True
    except Exception as e:
        print(f"❌ 创建订单失败: {e}")
        # 触发补偿:把库存加回去!
        try:
            requests.post('http://localhost:5001/compensate', json={
                'product_id': product_id,
                'quantity': quantity
            })
            print("🔄 已执行补偿:库存恢复")
        except Exception as ce:
            print(f"⚠️ 补偿也失败了!需人工介入: {ce}")
        return False

if __name__ == '__main__':
    # 先启动两个服务(在另外两个终端运行 inventory_service.py 和 order_service.py)
    time.sleep(2)
    success = place_order(user_id=123, product_id=1, quantity=1)
    if success:
        print("\n🎉 整个事务成功!")
    else:
        print("\n💥 事务失败,已尝试补偿")

运行测试

  1. 终端1:python inventory_service.py
  2. 终端2:python order_service.py
  3. 终端3:python main.py

你会看到两种结果:

  • 大概率成功(70%)
  • 小概率失败并自动补偿(30%,因为我们加了随机失败)

🔍 关键点:即使订单创建失败,库存也会通过 /compensate 接口恢复,保证最终一致性!


五、新手常见问题 & 避坑指南

Q1:Saga 能保证“强一致性”吗?

不能! 它是“最终一致性”。在补偿完成前,数据可能短暂不一致(比如库存少了但订单没建)。适合能容忍短时不一致的场景。

Q2:补偿操作失败怎么办?

这是 Saga 的难点!建议:

  • 补偿接口必须幂等(重复调用结果不变)
  • 加重试机制(比如指数退避)
  • 最终失败时告警,人工处理

Q3:有没有现成的框架?

有!Python 中可用:

  • Celery + Redis:用任务队列实现异步 Saga
  • Eventuate Tram(Java 更成熟) 但理解原理比用框架更重要!

Q4:面试常问什么?

高频面试题:

  • “分布式事务有哪些方案?各有什么优劣?”
  • “Saga 模式如何处理并发问题?”(答:通常靠业务层加锁或版本号)
  • “补偿操作怎么设计?”

💬 我面试时最爱问:“如果补偿接口本身失败了,你的系统会怎样?” —— 能答出“重试+告警+人工兜底”的,基本就过了。


六、下一步学习建议

你已经掌握了分布式事务的核心思想!接下来可以:

  1. 深入 Saga:学习如何用消息队列(如 RabbitMQ/Kafka)实现事件驱动的 Saga
  2. 试试 TCC:用 Python 模拟 Try/Confirm/Cancel 三个接口
  3. 了解 Seata:阿里开源的分布式事务框架(虽然主要是 Java,但思想通用)
  4. 读经典论文:《Sagas》 by Hector Garcia-Molina (1987)

🌟 最后送你一句话:分布式系统的本质,不是追求完美,而是在不完美中找平衡。

希望这篇教程帮你少走弯路。如果你觉得有用,欢迎分享给正在挣扎的小伙伴!

— 一个踩过无数坑的后端老兵

评论 0

最热最新
暂无评论
匿名用户Lv.1
0
影响力
0
文章
0
粉丝