分布式事务不再难:零基础也能掌握的最佳实践指南

半个架构师
2026-02-20 11:08
阅读 280

大家好,我是开源项目维护者,也是一名后端讲师。这些年,我写过上百篇技术文档,参与过多个高并发系统的架构设计。每次看到新手同学在“分布式事务”这个概念前望而却步,我就特别想写一篇真正从零开始的教程。

我当初学的时候,也曾在“ACID”“CAP”“TCC”这些术语里绕晕了头。那时候资料要么太理论,要么直接上源码,根本没人告诉我:“别慌,其实你可以一步一步来。”

今天这篇文章,就是为完全零基础的朋友准备的。我们会用最简单的语言、最贴近现实的例子,配合 Python 代码和实用工具,带你亲手实现一个分布式事务场景。哪怕你刚学完 Python 基础语法,也能跟得上!


为什么我们需要分布式事务?

想象一下:你正在开发一个电商系统,用户下单时需要做两件事:

  1. 扣减库存(调用库存服务)
  2. 创建订单(调用订单服务)

这两个操作必须同时成功或同时失败。如果只扣了库存但没生成订单,那商品就“凭空消失”了;反之,如果生成了订单但没扣库存,就会超卖。

在单体应用中,这很简单——用数据库事务就能保证。但在微服务架构下,库存和订单可能运行在不同服务器、使用不同数据库,传统的本地事务失效了。这就是分布式事务要解决的问题。

💡 简单说:分布式事务 = 跨多个服务/数据库的“原子操作”。


环境准备:5分钟搭好实验环境

我们不需要复杂的中间件,用纯 Python + 轻量工具就能模拟分布式场景。

所需工具清单

工具 用途 安装命令
Python 3.8+ 编程语言 官网下载
pip 包管理器 内置
fastapi 快速构建 API 服务 pip install fastapi uvicorn
requests 模拟服务间调用 pip install requests
sqlite3 轻量数据库(内置) 无需安装

📌 提示:如果你连 Python 都没装过,请先完成 Python 官方入门教程 的前两章。

创建项目结构

mkdir distributed-tx-demo
cd distributed-tx-demo
touch inventory_service.py order_service.py main_client.py

这三个文件将分别代表:

  • inventory_service.py:库存服务
  • order_service.py:订单服务
  • main_client.py:客户端,发起下单请求

核心概念:用买奶茶讲清楚分布式事务

别被术语吓到!我们用一个生活例子理解关键方案。

场景还原:小明买奶茶

小明去奶茶店,要完成两步:

  1. 支付(通过支付宝)
  2. 取奶茶(店员制作)

理想情况:支付成功 → 拿到奶茶。
异常情况:支付成功但店员罢工了?或者支付失败却给了奶茶?

分布式事务的主流方案,其实就是不同“处理异常”的策略:

方案 类比解释 适用场景
两阶段提交 (2PC) 小明先问:“能做吗?” 店员说“能”,再真正支付 强一致性要求高,性能要求低
TCC(Try-Confirm-Cancel) 先冻结金额(Try),成功则扣款(Confirm),失败则解冻(Cancel) 高并发,可自定义补偿逻辑
Saga 模式 每步成功就记录,失败时逐个“撤销”前面的操作 长流程,允许最终一致性
消息队列 + 本地消息表 支付成功后发消息:“请做奶茶”,店员收到后执行 解耦服务,异步处理

✨ 本文重点实践 TCC 模式消息队列方案,因为它们在实际项目中最常用,也最容易用 Python 实现。


实战一:用 TCC 模式实现下单逻辑

TCC 分三步:

  • Try:尝试预留资源(如冻结库存)
  • Confirm:确认操作(真正扣减)
  • Cancel:取消预留(释放冻结)

步骤 1:编写库存服务(支持 TCC)

# inventory_service.py
from fastapi import FastAPI, HTTPException
import sqlite3

app = FastAPI()

# 初始化数据库
def init_db():
    conn = sqlite3.connect('inventory.db')
    conn.execute('''CREATE TABLE IF NOT EXISTS stock (
        product_id TEXT PRIMARY KEY,
        available INTEGER,
        frozen INTEGER
    )''')
    # 插入测试数据:product_001 有 10 件库存
    conn.execute("INSERT OR IGNORE INTO stock VALUES ('product_001', 10, 0)")
    conn.commit()
    conn.close()

@app.post("/try_reserve")
def try_reserve(product_id: str, quantity: int):
    conn = sqlite3.connect('inventory.db')
    cur = conn.cursor()
    cur.execute("SELECT available, frozen FROM stock WHERE product_id=?", (product_id,))
    row = cur.fetchone()
    if not row:
        raise HTTPException(status_code=404, detail="Product not found")
    
    available, frozen = row
    if available < quantity:
        raise HTTPException(status_code=400, detail="Insufficient stock")
    
    # 冻结库存
    cur.execute(
        "UPDATE stock SET frozen = frozen + ?, available = available - ? WHERE product_id=?",
        (quantity, quantity, product_id)
    )
    conn.commit()
    conn.close()
    return {"status": "reserved"}

@app.post("/confirm")
def confirm(product_id: str, quantity: int):
    # 实际扣减:已冻结的库存转为已售出(此处简化,直接丢弃冻结数)
    conn = sqlite3.connect('inventory.db')
    conn.execute(
        "UPDATE stock SET frozen = frozen - ? WHERE product_id=?",
        (quantity, product_id)
    )
    conn.commit()
    conn.close()
    return {"status": "confirmed"}

@app.post("/cancel")
def cancel(product_id: str, quantity: int):
    # 取消冻结:把冻结的库存还回去
    conn = sqlite3.connect('inventory.db')
    conn.execute(
        "UPDATE stock SET frozen = frozen - ?, available = available + ? WHERE product_id=?",
        (quantity, quantity, product_id)
    )
    conn.commit()
    conn.close()
    return {"status": "cancelled"}

if __name__ == "__main__":
    init_db()
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8001)

步骤 2:编写订单服务

# order_service.py
from fastapi import FastAPI, HTTPException
import sqlite3

app = FastAPI()

def init_db():
    conn = sqlite3.connect('orders.db')
    conn.execute('''CREATE TABLE IF NOT EXISTS orders (
        order_id TEXT PRIMARY KEY,
        product_id TEXT,
        status TEXT
    )''')
    conn.commit()
    conn.close()

@app.post("/create")
def create_order(order_id: str, product_id: str):
    conn = sqlite3.connect('orders.db')
    try:
        conn.execute(
            "INSERT INTO orders (order_id, product_id, status) VALUES (?, ?, 'created')",
            (order_id, product_id)
        )
        conn.commit()
        return {"status": "order_created"}
    except sqlite3.IntegrityError:
        raise HTTPException(status_code=400, detail="Order already exists")
    finally:
        conn.close()

@app.post("/confirm")
def confirm_order(order_id: str):
    conn = sqlite3.connect('orders.db')
    conn.execute("UPDATE orders SET status='confirmed' WHERE order_id=?", (order_id,))
    conn.commit()
    conn.close()
    return {"status": "order_confirmed"}

@app.post("/cancel")
def cancel_order(order_id: str):
    conn = sqlite3.connect('orders.db')
    conn.execute("DELETE FROM orders WHERE order_id=?", (order_id,))
    conn.commit()
    conn.close()
    return {"status": "order_cancelled"}

if __name__ == "__main__":
    init_db()
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8002)

步骤 3:客户端协调 TCC 流程

# main_client.py
import requests
import time
import uuid

INVENTORY_URL = "http://127.0.0.1:8001"
ORDER_URL = "http://127.0.0.1:8002"

def place_order(product_id: str, quantity: int):
    order_id = str(uuid.uuid4())
    print(f"下单开始,订单ID: {order_id}")
    
    # Step 1: Try 阶段
    try:
        print("→ 尝试冻结库存...")
        requests.post(f"{INVENTORY_URL}/try_reserve", 
                     json={"product_id": product_id, "quantity": quantity}, timeout=2)
        print("✓ 库存冻结成功")
        
        print("→ 尝试创建订单...")
        requests.post(f"{ORDER_URL}/create", 
                     json={"order_id": order_id, "product_id": product_id}, timeout=2)
        print("✓ 订单创建成功")
        
    except Exception as e:
        print(f"✗ Try 阶段失败: {e}")
        # 执行 Cancel
        _cancel_all(order_id, product_id, quantity)
        return False
    
    # Step 2: Confirm 阶段
    try:
        print("→ 确认库存扣减...")
        requests.post(f"{INVENTORY_URL}/confirm", 
                     json={"product_id": product_id, "quantity": quantity})
        print("→ 确认订单...")
        requests.post(f"{ORDER_URL}/confirm", 
                     json={"order_id": order_id})
        print("✅ 下单成功!")
        return True
        
    except Exception as e:
        print(f"✗ Confirm 阶段失败: {e}")
        # 注意:Confirm 失败通常需要人工介入或重试机制
        # 这里简化处理,仅打印错误
        return False

def _cancel_all(order_id, product_id, quantity):
    """统一取消所有 Try 操作"""
    try:
        requests.post(f"{INVENTORY_URL}/cancel", 
                     json={"product_id": product_id, "quantity": quantity})
        print("✓ 库存冻结已取消")
    except:
        print("⚠️ 库存取消失败(可能已自动过期)")
    
    try:
        requests.post(f"{ORDER_URL}/cancel", 
                     json={"order_id": order_id})
        print("✓ 订单已取消")
    except:
        print("⚠️ 订单取消失败")

if __name__ == "__main__":
    # 启动前请先运行 inventory_service.py 和 order_service.py
    place_order("product_001", 2)

运行测试

  1. 终端 1:python inventory_service.py
  2. 维终端 2:python order_service.py
  3. 终端 3:python main_client.py

你会看到类似输出:

下单开始,订单ID: a1b2c3...
→ 尝试冻结库存...
✓ 库存冻结成功
→ 尝试创建订单...
✓ 订单创建成功
→ 确认库存扣减...
→ 确认订单...
✅ 下单成功!

🔍 查看数据库:sqlite3 inventory.db "SELECT * FROM stock;" 会发现 available=8, frozen=0,说明事务成功。


实战二:用消息队列实现最终一致性(更轻量)

TCC 需要每个服务都实现三套接口,成本较高。另一种思路是:用消息驱动

核心思想:

  1. 订单服务创建“待支付”订单
  2. 发送消息到队列:“请扣库存”
  3. 库存服务消费消息,执行扣减
  4. 如果失败,消息重试,直到成功(最终一致)

我们用 Python 内置队列模拟消息中间件(生产环境可用 RabbitMQ/Kafka)。

修改订单服务(发送消息)

# 在 order_service.py 中增加
from queue import Queue
import threading

# 全局消息队列(模拟)
MESSAGE_QUEUE = Queue()

@app.post("/create_and_notify")
def create_and_notify(order_id: str, product_id: str, quantity: int):
    # 1. 创建订单(状态为 pending)
    conn = sqlite3.connect('orders.db')
    conn.execute(
        "INSERT INTO orders (order_id, product_id, status) VALUES (?, ?, 'pending')",
        (order_id, product_id)
    )
    conn.commit()
    conn.close()
    
    # 2. 发送消息
    MESSAGE_QUEUE.put({
        "order_id": order_id,
        "product_id": product_id,
        "quantity": quantity
    })
    return {"status": "order_pending", "message_queued": True}

库存服务监听消息

# 在 inventory_service.py 中增加
import threading
import time

def consume_messages():
    """后台线程:消费消息并扣库存"""
    while True:
        if not MESSAGE_QUEUE.empty():
            msg = MESSAGE_QUEUE.get()
            print(f"📦 收到消息: 扣减 {msg['quantity']} 件 {msg['product_id']}")
            
            # 模拟扣库存(简化:直接扣减,不冻结)
            conn = sqlite3.connect('inventory.db')
            cur = conn.cursor()
            cur.execute("SELECT available FROM stock WHERE product_id=?", (msg["product_id"],))
            available = cur.fetchone()[0]
            
            if available >= msg["quantity"]:
                conn.execute(
                    "UPDATE stock SET available = available - ? WHERE product_id=?",
                    (msg["quantity"], msg["product_id"])
                )
                conn.commit()
                print("✅ 库存扣减成功")
            else:
                print("❌ 库存不足,消息将重试...")
                # 重新入队(模拟重试)
                MESSAGE_QUEUE.put(msg)
                time.sleep(2)  # 等待2秒再试
            
            conn.close()
        time.sleep(0.5)

# 启动消费线程
threading.Thread(target=consume_messages, daemon=True).start()

💡 这种方式代码更简单,适合对一致性要求不是“强实时”的场景(如积分、日志同步)。


新手常见问题解答

Q1:为什么不用数据库的 XA 事务?

XA 是标准的 2PC 协议,但性能差、锁时间长,且很多 NoSQL 不支持。现代互联网系统更倾向柔性事务(如 TCC、Saga)。

Q2:Confirm 或 Cancel 失败怎么办?

  • Cancel 失败:通常设计为幂等操作(多次调用结果一致),或设置过期自动清理。
  • Confirm 失败:需要监控告警 + 人工介入。高级方案可用“最大努力通知”。

Q3:Kimi 在这里有什么用?

你可能注意到文章提到了 Kimi。作为 AI 编程助手,Kimi 能帮你:

  • 自动生成 TCC 接口模板
  • 解释分布式事务的异常场景
  • 推荐适合你业务的方案

例如,在 Kimi 中输入:“用 Python 写一个 TCC 库存服务示例”,它能快速给出结构清晰的代码框架,节省你查文档的时间。

Q4:如何测试分布式事务的异常?

main_client.py 中故意注释掉某一步,比如:

# requests.post(f"{ORDER_URL}/create", ...)  # 模拟订单服务宕机

观察 Cancel 是否正确触发。


下一步学习建议

你已经掌握了分布式事务的核心思想!接下来可以:

  1. 深入原理:阅读《Designing Data-Intensive Applications》第9章
  2. 实战进阶
    • 用 Seata(开源分布式事务框架)替换手动 TCC
    • 集成 RabbitMQ 实现可靠消息
  3. 工具推荐
    • Locust:压测你的事务接口
    • Prometheus + Grafana:监控事务成功率
    • Kimi:辅助理解复杂场景(如“网络分区下的事务行为”)

🌟 最后提醒:没有“银弹”。选择方案前,先问自己:

  • 我的业务能容忍多久的不一致?
  • 回滚成本高吗?
  • 团队能否维护复杂补偿逻辑?

分布式事务不是魔法,而是一套权衡的艺术。希望这篇教程能成为你架构之路的第一块基石。

动手试试吧!代码跑通的那一刻,你会发现:原来“高大上”的概念,也不过是几行清晰的逻辑而已。

评论 0

最热最新
暂无评论
匿名用户Lv.1
0
影响力
0
文章
0
粉丝