分布式事务不再难:手把手带你用Python实现可靠系统

孙玉
2026-05-29 05:37
阅读 660

大家好,我是一名开源项目维护者,也是一名后端讲师。过去几年里,我参与维护了多个高并发的微服务系统,也写过不少技术文档。今天之所以要写这篇教程,是因为我当初学分布式事务时踩过太多坑——概念抽象、资料晦涩、代码复杂,让人望而却步。其实,只要从实践出发,零基础也能搞懂它。

这篇文章不讲大道理,只带你动手做。我们会用 Python 写一个简单的订单-库存系统,并通过真实代码理解“分布式事务”到底怎么保证数据一致。即使你刚学完 Python 基础,也能跟得上!

为什么需要分布式事务?

想象一下:你在电商平台下单买书。系统要做两件事:

  1. 扣减库存(库存服务)
  2. 创建订单(订单服务)

这两个操作分别在两个不同的数据库里执行。如果第一步成功了,第二步却失败了(比如网络中断),就会出现“库存少了但没生成订单”的问题——用户白亏,商家也亏。

单体应用中,我们可以用数据库事务(BEGIN...COMMIT)保证原子性。但在分布式系统中,多个服务各自管理自己的数据库,传统事务就失效了。

这时候,就需要分布式事务解决方案来协调多个服务,确保“要么全成功,要么全回滚”。

💡 小知识:分布式事务 ≠ 数据库事务。它是跨服务、跨数据库的一致性保障机制。


环境准备:5分钟搭好开发环境

我们用最轻量的方式搭建实验环境,不需要 Docker 或复杂中间件。

所需工具清单

工具 版本要求 安装方式
Python ≥3.8 官网下载或 pyenv
pip 最新 通常随 Python 自带
Flask 最新 pip install flask
SQLite 内置 无需安装
requests 最新 pip install requests

✅ 提示:本文所有代码兼容 Python 3.8+,无需虚拟环境也可运行(但建议使用 venv 隔离依赖)。

创建项目结构

mkdir distributed-tx-demo
cd distributed-tx-demo
touch inventory.py order.py main.py requirements.txt

然后安装依赖:

echo "flask\nrequests" > requirements.txt
pip install -r requirements.txt

搞定!接下来我们就要动手写服务了。


核心概念:用生活例子讲清楚三种方案

分布式事务有多种实现方式。对初学者来说,掌握以下三种就够了:

1. 两阶段提交(2PC)——像婚礼主持人

类比:主持人问新郎“你愿意吗?”(准备阶段),再问新娘“你愿意吗?”,只有两人都说“愿意”,才宣布结婚(提交阶段)。

特点

  • 强一致性
  • 性能差,容易阻塞
  • 适合数据库内部(如 MySQL XA),不适合业务服务

🚫 初学者慎用:2PC 在业务层实现复杂,且对服务侵入性强。

2. TCC(Try-Confirm-Cancel)——像订酒店

  • Try:预占房间(检查库存并冻结)
  • Confirm:真正入住(扣减库存)
  • Cancel:取消预订(释放冻结)

优点:灵活、高性能
缺点:每个服务都要写三套逻辑,开发成本高

3. 本地消息表 + 最终一致性 —— 我们的选择!

这是最适合初学者的方案。核心思想是:

把“跨服务调用”变成“本地事务 + 异步重试”

具体步骤:

  1. 在订单服务中,先写本地消息表(和创建订单在一个事务里)
  2. 后台任务轮询消息表,异步调用库存服务
  3. 如果调用失败,不断重试直到成功(最终一致)

✅ 优点:简单、可靠、不依赖外部中间件
✅ 适合 80% 的业务场景(如电商、支付)

👨‍🏫 我当初第一个上线的分布式系统就是用这个方案,至今稳定运行三年。


实战:用 Python 实现订单-库存系统

我们现在动手实现一个简化版系统,包含两个服务:

  • order.py:订单服务(含本地消息表)
  • inventory.py:库存服务

第一步:实现库存服务

# inventory.py
from flask import Flask, request, jsonify
import sqlite3

app = Flask(__name__)

def init_db():
    conn = sqlite3.connect('inventory.db')
    c = conn.cursor()
    c.execute('''CREATE TABLE IF NOT EXISTS stock (
        item_id TEXT PRIMARY KEY,
        quantity INTEGER
    )''')
    # 初始化一本书,库存为10
    c.execute("INSERT OR IGNORE INTO stock (item_id, quantity) VALUES ('book-001', 10)")
    conn.commit()
    conn.close()

@app.route('/deduct', methods=['POST'])
def deduct_stock():
    data = request.json
    item_id = data['item_id']
    amount = data['amount']
    
    conn = sqlite3.connect('inventory.db')
    c = conn.cursor()
    
    # 检查库存是否足够
    c.execute("SELECT quantity FROM stock WHERE item_id = ?", (item_id,))
    row = c.fetchone()
    if not row or row[0] < amount:
        return jsonify({"error": "Insufficient stock"}), 400
    
    # 扣减库存
    c.execute("UPDATE stock SET quantity = quantity - ? WHERE item_id = ?", (amount, item_id))
    conn.commit()
    conn.close()
    
    return jsonify({"status": "success"})

if __name__ == '__main__':
    init_db()
    app.run(port=5001)

启动命令:

python inventory.py

第二步:实现订单服务(含本地消息表)

# order.py
from flask import Flask, request, jsonify
import sqlite3
import threading
import time
import requests

app = Flask(__name__)

def init_db():
    conn = sqlite3.connect('order.db')
    c = conn.cursor()
    # 订单表
    c.execute('''CREATE TABLE IF NOT EXISTS orders (
        order_id TEXT PRIMARY KEY,
        item_id TEXT,
        amount INTEGER
    )''')
    # 本地消息表
    c.execute('''CREATE TABLE IF NOT EXISTS outbox (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        order_id TEXT,
        status TEXT DEFAULT 'pending'  -- pending, success, failed
    )''')
    conn.commit()
    conn.close()

def send_to_inventory(order_id):
    """模拟异步发送消息(实际可用 Celery 或线程池)"""
    conn = sqlite3.connect('order.db')
    c = conn.cursor()
    c.execute("SELECT item_id, amount FROM orders WHERE order_id = ?", (order_id,))
    order = c.fetchone()
    conn.close()
    
    if not order:
        return
    
    item_id, amount = order
    
    try:
        resp = requests.post('http://localhost:5001/deduct', json={
            "item_id": item_id,
            "amount": amount
        }, timeout=3)
        
        if resp.status_code == 200:
            # 更新消息状态为成功
            conn = sqlite3.connect('order.db')
            c = conn.cursor()
            c.execute("UPDATE outbox SET status = 'success' WHERE order_id = ?", (order_id,))
            conn.commit()
            conn.close()
            print(f"[SUCCESS] Deducted stock for order {order_id}")
        else:
            raise Exception("Inventory service rejected")
            
    except Exception as e:
        print(f"[RETRYING] Failed to deduct stock for {order_id}: {e}")
        # 这里可以记录失败次数,超过阈值告警
        time.sleep(2)  # 简单重试
        send_to_inventory(order_id)  # 递归重试(生产环境改用队列)

@app.route('/create_order', methods=['POST'])
def create_order():
    data = request.json
    order_id = data['order_id']
    item_id = data['item_id']
    amount = data['amount']
    
    conn = sqlite3.connect('order.db')
    c = conn.cursor()
    
    try:
        # 1. 创建订单
        c.execute("INSERT INTO orders (order_id, item_id, amount) VALUES (?, ?, ?)",
                  (order_id, item_id, amount))
        # 2. 插入本地消息(同一个事务!)
        c.execute("INSERT INTO outbox (order_id) VALUES (?)", (order_id,))
        conn.commit()
        
        # 3. 异步处理消息(新开线程避免阻塞HTTP请求)
        threading.Thread(target=send_to_inventory, args=(order_id,)).start()
        
        return jsonify({"status": "order created, processing stock deduction"})
    except Exception as e:
        conn.rollback()
        return jsonify({"error": str(e)}), 500
    finally:
        conn.close()

# 启动后台重试任务(简化版)
def start_retry_worker():
    def worker():
        while True:
            time.sleep(5)
            conn = sqlite3.connect('order.db')
            c = conn.cursor()
            c.execute("SELECT order_id FROM outbox WHERE status = 'pending'")
            pending = c.fetchall()
            conn.close()
            for (order_id,) in pending:
                threading.Thread(target=send_to_inventory, args=(order_id,)).start()
    threading.Thread(target=worker, daemon=True).start()

if __name__ == '__main__':
    init_db()
    start_retry_worker()  # 启动重试线程
    app.run(port=5000)

启动命令:

python order.py

第三步:测试整个流程

打开第三个终端,发送下单请求:

curl -X POST http://localhost:5000/create_order \
  -H "Content-Type: application/json" \
  -d '{"order_id": "ord-123", "item_id": "book-001", "amount": 1}'

你会看到:

  • 订单服务返回成功
  • 终端打印 [SUCCESS] Deducted stock for order ord-123
  • 查询库存:sqlite3 inventory.db "SELECT * FROM stock;" → 数量变为9

✅ 这就是最终一致性:订单先创建,库存稍后扣减,但最终数据一致。


新手常见问题解答

Q1:为什么不用数据库的 XA 事务(2PC)?

XA 对业务代码侵入大,且要求所有参与方支持 XA 协议(很多 NoSQL 不支持)。本地消息表方案更通用、更易调试。

Q2:消息重复投递怎么办?库存被多扣?

好问题!这就是为什么库存服务必须幂等。我们的 /deduct 接口虽然没显式处理,但你可以改进:

  • 方案1:传入唯一请求 ID,服务端记录已处理的 ID
  • 方案2:用“设置新库存值”代替“扣减”,避免多次扣减

Q3:Claude Code 和 Prompt 工程在这里有什么用?

你可能注意到了文章开头提到的关键词。这里解释一下:

  • Claude Code 是 Anthropic 推出的 AI 编程助手,可帮助生成分布式事务的样板代码。

  • Prompt 工程 指如何有效提问 AI。例如,你可以这样问 Claude:

    “用 Python 写一个基于本地消息表的分布式事务 demo,包含订单和库存服务,使用 Flask 和 SQLite。”

    好的 prompt 能让 AI 输出更贴近你需求的代码(比如指定 v0 版本、避免复杂依赖)。

🔧 实践建议:初学者可先手写代码理解原理,再用 AI 工具加速开发。

Q4:生产环境能这么简单吗?

不能。真实系统还需要:

  • 消息表增加重试次数限制
  • 失败消息进入死信队列人工干预
  • 使用 RabbitMQ/Kafka 替代轮询
  • 加入监控和告警

核心思想不变——先保证本地事务,再异步同步。


下一步学习建议

你已经掌握了分布式事务的核心思路!接下来可以:

  1. 深入 TCC 模式:尝试改造当前系统,加入 Try/Confirm/Cancel 三个接口
  2. 引入消息队列:用 Redis 或 RabbitMQ 替代轮询,提升性能
  3. 学习 Saga 模式:适用于长流程事务(如旅行预订)
  4. 阅读开源项目:如 Seata(Java)、DTM(Go)的源码

📌 避坑指南:不要一上来就学 Seata 或 RocketMQ 事务消息!先掌握“本地消息表”这种朴素但有效的方案,再进阶。


结语

分布式事务听起来高大上,但本质是“把复杂问题拆解成简单步骤”。我们用不到 100 行 Python 代码,就实现了一个可靠的最终一致性系统。

记住:没有银弹,只有合适场景的解决方案。对大多数业务,本地消息表 + 幂等设计已经足够。

希望这篇实践驱动的教程能帮你迈出第一步。如果你跑通了代码,欢迎在评论区告诉我!也别忘了 star 你的第一个分布式项目 😄

作者:开源项目维护者 | 教学理念:先跑起来,再优化

评论 0

最热最新
暂无评论
匿名用户Lv.1
0
影响力
0
文章
0
粉丝