分布式事务不再难:零基础也能掌握的最佳实践指南
大家好,我是开源项目维护者,也是一名后端讲师。这些年,我写过上百篇技术文档,参与过多个高并发系统的架构设计。每次看到新手同学在“分布式事务”这个概念前望而却步,我就特别想写一篇真正从零开始的教程。
我当初学的时候,也曾在“ACID”“CAP”“TCC”这些术语里绕晕了头。那时候资料要么太理论,要么直接上源码,根本没人告诉我:“别慌,其实你可以一步一步来。”
今天这篇文章,就是为完全零基础的朋友准备的。我们会用最简单的语言、最贴近现实的例子,配合 Python 代码和实用工具,带你亲手实现一个分布式事务场景。哪怕你刚学完 Python 基础语法,也能跟得上!
为什么我们需要分布式事务?
想象一下:你正在开发一个电商系统,用户下单时需要做两件事:
- 扣减库存(调用库存服务)
- 创建订单(调用订单服务)
这两个操作必须同时成功或同时失败。如果只扣了库存但没生成订单,那商品就“凭空消失”了;反之,如果生成了订单但没扣库存,就会超卖。
在单体应用中,这很简单——用数据库事务就能保证。但在微服务架构下,库存和订单可能运行在不同服务器、使用不同数据库,传统的本地事务失效了。这就是分布式事务要解决的问题。
💡 简单说:分布式事务 = 跨多个服务/数据库的“原子操作”。
环境准备:5分钟搭好实验环境
我们不需要复杂的中间件,用纯 Python + 轻量工具就能模拟分布式场景。
所需工具清单
| 工具 | 用途 | 安装命令 |
|---|---|---|
| Python 3.8+ | 编程语言 | 官网下载 |
| pip | 包管理器 | 内置 |
fastapi |
快速构建 API 服务 | pip install fastapi uvicorn |
requests |
模拟服务间调用 | pip install requests |
sqlite3 |
轻量数据库(内置) | 无需安装 |
📌 提示:如果你连 Python 都没装过,请先完成 Python 官方入门教程 的前两章。
创建项目结构
mkdir distributed-tx-demo
cd distributed-tx-demo
touch inventory_service.py order_service.py main_client.py
这三个文件将分别代表:
inventory_service.py:库存服务order_service.py:订单服务main_client.py:客户端,发起下单请求
核心概念:用买奶茶讲清楚分布式事务
别被术语吓到!我们用一个生活例子理解关键方案。
场景还原:小明买奶茶
小明去奶茶店,要完成两步:
- 支付(通过支付宝)
- 取奶茶(店员制作)
理想情况:支付成功 → 拿到奶茶。
异常情况:支付成功但店员罢工了?或者支付失败却给了奶茶?
分布式事务的主流方案,其实就是不同“处理异常”的策略:
| 方案 | 类比解释 | 适用场景 |
|---|---|---|
| 两阶段提交 (2PC) | 小明先问:“能做吗?” 店员说“能”,再真正支付 | 强一致性要求高,性能要求低 |
| TCC(Try-Confirm-Cancel) | 先冻结金额(Try),成功则扣款(Confirm),失败则解冻(Cancel) | 高并发,可自定义补偿逻辑 |
| Saga 模式 | 每步成功就记录,失败时逐个“撤销”前面的操作 | 长流程,允许最终一致性 |
| 消息队列 + 本地消息表 | 支付成功后发消息:“请做奶茶”,店员收到后执行 | 解耦服务,异步处理 |
✨ 本文重点实践 TCC 模式 和 消息队列方案,因为它们在实际项目中最常用,也最容易用 Python 实现。
实战一:用 TCC 模式实现下单逻辑
TCC 分三步:
- Try:尝试预留资源(如冻结库存)
- Confirm:确认操作(真正扣减)
- Cancel:取消预留(释放冻结)
步骤 1:编写库存服务(支持 TCC)
# inventory_service.py
from fastapi import FastAPI, HTTPException
import sqlite3
app = FastAPI()
# 初始化数据库
def init_db():
conn = sqlite3.connect('inventory.db')
conn.execute('''CREATE TABLE IF NOT EXISTS stock (
product_id TEXT PRIMARY KEY,
available INTEGER,
frozen INTEGER
)''')
# 插入测试数据:product_001 有 10 件库存
conn.execute("INSERT OR IGNORE INTO stock VALUES ('product_001', 10, 0)")
conn.commit()
conn.close()
@app.post("/try_reserve")
def try_reserve(product_id: str, quantity: int):
conn = sqlite3.connect('inventory.db')
cur = conn.cursor()
cur.execute("SELECT available, frozen FROM stock WHERE product_id=?", (product_id,))
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Product not found")
available, frozen = row
if available < quantity:
raise HTTPException(status_code=400, detail="Insufficient stock")
# 冻结库存
cur.execute(
"UPDATE stock SET frozen = frozen + ?, available = available - ? WHERE product_id=?",
(quantity, quantity, product_id)
)
conn.commit()
conn.close()
return {"status": "reserved"}
@app.post("/confirm")
def confirm(product_id: str, quantity: int):
# 实际扣减:已冻结的库存转为已售出(此处简化,直接丢弃冻结数)
conn = sqlite3.connect('inventory.db')
conn.execute(
"UPDATE stock SET frozen = frozen - ? WHERE product_id=?",
(quantity, product_id)
)
conn.commit()
conn.close()
return {"status": "confirmed"}
@app.post("/cancel")
def cancel(product_id: str, quantity: int):
# 取消冻结:把冻结的库存还回去
conn = sqlite3.connect('inventory.db')
conn.execute(
"UPDATE stock SET frozen = frozen - ?, available = available + ? WHERE product_id=?",
(quantity, quantity, product_id)
)
conn.commit()
conn.close()
return {"status": "cancelled"}
if __name__ == "__main__":
init_db()
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8001)
步骤 2:编写订单服务
# order_service.py
from fastapi import FastAPI, HTTPException
import sqlite3
app = FastAPI()
def init_db():
conn = sqlite3.connect('orders.db')
conn.execute('''CREATE TABLE IF NOT EXISTS orders (
order_id TEXT PRIMARY KEY,
product_id TEXT,
status TEXT
)''')
conn.commit()
conn.close()
@app.post("/create")
def create_order(order_id: str, product_id: str):
conn = sqlite3.connect('orders.db')
try:
conn.execute(
"INSERT INTO orders (order_id, product_id, status) VALUES (?, ?, 'created')",
(order_id, product_id)
)
conn.commit()
return {"status": "order_created"}
except sqlite3.IntegrityError:
raise HTTPException(status_code=400, detail="Order already exists")
finally:
conn.close()
@app.post("/confirm")
def confirm_order(order_id: str):
conn = sqlite3.connect('orders.db')
conn.execute("UPDATE orders SET status='confirmed' WHERE order_id=?", (order_id,))
conn.commit()
conn.close()
return {"status": "order_confirmed"}
@app.post("/cancel")
def cancel_order(order_id: str):
conn = sqlite3.connect('orders.db')
conn.execute("DELETE FROM orders WHERE order_id=?", (order_id,))
conn.commit()
conn.close()
return {"status": "order_cancelled"}
if __name__ == "__main__":
init_db()
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8002)
步骤 3:客户端协调 TCC 流程
# main_client.py
import requests
import time
import uuid
INVENTORY_URL = "http://127.0.0.1:8001"
ORDER_URL = "http://127.0.0.1:8002"
def place_order(product_id: str, quantity: int):
order_id = str(uuid.uuid4())
print(f"下单开始,订单ID: {order_id}")
# Step 1: Try 阶段
try:
print("→ 尝试冻结库存...")
requests.post(f"{INVENTORY_URL}/try_reserve",
json={"product_id": product_id, "quantity": quantity}, timeout=2)
print("✓ 库存冻结成功")
print("→ 尝试创建订单...")
requests.post(f"{ORDER_URL}/create",
json={"order_id": order_id, "product_id": product_id}, timeout=2)
print("✓ 订单创建成功")
except Exception as e:
print(f"✗ Try 阶段失败: {e}")
# 执行 Cancel
_cancel_all(order_id, product_id, quantity)
return False
# Step 2: Confirm 阶段
try:
print("→ 确认库存扣减...")
requests.post(f"{INVENTORY_URL}/confirm",
json={"product_id": product_id, "quantity": quantity})
print("→ 确认订单...")
requests.post(f"{ORDER_URL}/confirm",
json={"order_id": order_id})
print("✅ 下单成功!")
return True
except Exception as e:
print(f"✗ Confirm 阶段失败: {e}")
# 注意:Confirm 失败通常需要人工介入或重试机制
# 这里简化处理,仅打印错误
return False
def _cancel_all(order_id, product_id, quantity):
"""统一取消所有 Try 操作"""
try:
requests.post(f"{INVENTORY_URL}/cancel",
json={"product_id": product_id, "quantity": quantity})
print("✓ 库存冻结已取消")
except:
print("⚠️ 库存取消失败(可能已自动过期)")
try:
requests.post(f"{ORDER_URL}/cancel",
json={"order_id": order_id})
print("✓ 订单已取消")
except:
print("⚠️ 订单取消失败")
if __name__ == "__main__":
# 启动前请先运行 inventory_service.py 和 order_service.py
place_order("product_001", 2)
运行测试
- 终端 1:
python inventory_service.py - 维终端 2:
python order_service.py - 终端 3:
python main_client.py
你会看到类似输出:
下单开始,订单ID: a1b2c3...
→ 尝试冻结库存...
✓ 库存冻结成功
→ 尝试创建订单...
✓ 订单创建成功
→ 确认库存扣减...
→ 确认订单...
✅ 下单成功!
🔍 查看数据库:
sqlite3 inventory.db "SELECT * FROM stock;"会发现available=8, frozen=0,说明事务成功。
实战二:用消息队列实现最终一致性(更轻量)
TCC 需要每个服务都实现三套接口,成本较高。另一种思路是:用消息驱动。
核心思想:
- 订单服务创建“待支付”订单
- 发送消息到队列:“请扣库存”
- 库存服务消费消息,执行扣减
- 如果失败,消息重试,直到成功(最终一致)
我们用 Python 内置队列模拟消息中间件(生产环境可用 RabbitMQ/Kafka)。
修改订单服务(发送消息)
# 在 order_service.py 中增加
from queue import Queue
import threading
# 全局消息队列(模拟)
MESSAGE_QUEUE = Queue()
@app.post("/create_and_notify")
def create_and_notify(order_id: str, product_id: str, quantity: int):
# 1. 创建订单(状态为 pending)
conn = sqlite3.connect('orders.db')
conn.execute(
"INSERT INTO orders (order_id, product_id, status) VALUES (?, ?, 'pending')",
(order_id, product_id)
)
conn.commit()
conn.close()
# 2. 发送消息
MESSAGE_QUEUE.put({
"order_id": order_id,
"product_id": product_id,
"quantity": quantity
})
return {"status": "order_pending", "message_queued": True}
库存服务监听消息
# 在 inventory_service.py 中增加
import threading
import time
def consume_messages():
"""后台线程:消费消息并扣库存"""
while True:
if not MESSAGE_QUEUE.empty():
msg = MESSAGE_QUEUE.get()
print(f"📦 收到消息: 扣减 {msg['quantity']} 件 {msg['product_id']}")
# 模拟扣库存(简化:直接扣减,不冻结)
conn = sqlite3.connect('inventory.db')
cur = conn.cursor()
cur.execute("SELECT available FROM stock WHERE product_id=?", (msg["product_id"],))
available = cur.fetchone()[0]
if available >= msg["quantity"]:
conn.execute(
"UPDATE stock SET available = available - ? WHERE product_id=?",
(msg["quantity"], msg["product_id"])
)
conn.commit()
print("✅ 库存扣减成功")
else:
print("❌ 库存不足,消息将重试...")
# 重新入队(模拟重试)
MESSAGE_QUEUE.put(msg)
time.sleep(2) # 等待2秒再试
conn.close()
time.sleep(0.5)
# 启动消费线程
threading.Thread(target=consume_messages, daemon=True).start()
💡 这种方式代码更简单,适合对一致性要求不是“强实时”的场景(如积分、日志同步)。
新手常见问题解答
Q1:为什么不用数据库的 XA 事务?
XA 是标准的 2PC 协议,但性能差、锁时间长,且很多 NoSQL 不支持。现代互联网系统更倾向柔性事务(如 TCC、Saga)。
Q2:Confirm 或 Cancel 失败怎么办?
- Cancel 失败:通常设计为幂等操作(多次调用结果一致),或设置过期自动清理。
- Confirm 失败:需要监控告警 + 人工介入。高级方案可用“最大努力通知”。
Q3:Kimi 在这里有什么用?
你可能注意到文章提到了 Kimi。作为 AI 编程助手,Kimi 能帮你:
- 自动生成 TCC 接口模板
- 解释分布式事务的异常场景
- 推荐适合你业务的方案
例如,在 Kimi 中输入:“用 Python 写一个 TCC 库存服务示例”,它能快速给出结构清晰的代码框架,节省你查文档的时间。
Q4:如何测试分布式事务的异常?
在 main_client.py 中故意注释掉某一步,比如:
# requests.post(f"{ORDER_URL}/create", ...) # 模拟订单服务宕机
观察 Cancel 是否正确触发。
下一步学习建议
你已经掌握了分布式事务的核心思想!接下来可以:
- 深入原理:阅读《Designing Data-Intensive Applications》第9章
- 实战进阶:
- 用 Seata(开源分布式事务框架)替换手动 TCC
- 集成 RabbitMQ 实现可靠消息
- 工具推荐:
- Locust:压测你的事务接口
- Prometheus + Grafana:监控事务成功率
- Kimi:辅助理解复杂场景(如“网络分区下的事务行为”)
🌟 最后提醒:没有“银弹”。选择方案前,先问自己:
- 我的业务能容忍多久的不一致?
- 回滚成本高吗?
- 团队能否维护复杂补偿逻辑?
分布式事务不是魔法,而是一套权衡的艺术。希望这篇教程能成为你架构之路的第一块基石。
动手试试吧!代码跑通的那一刻,你会发现:原来“高大上”的概念,也不过是几行清晰的逻辑而已。

评论 0