分布式事务不再难:手把手带你用Python实现可靠系统
大家好,我是一名开源项目维护者,也是一名后端讲师。过去几年里,我参与维护了多个高并发的微服务系统,也写过不少技术文档。今天之所以要写这篇教程,是因为我当初学分布式事务时踩过太多坑——概念抽象、资料晦涩、代码复杂,让人望而却步。其实,只要从实践出发,零基础也能搞懂它。
这篇文章不讲大道理,只带你动手做。我们会用 Python 写一个简单的订单-库存系统,并通过真实代码理解“分布式事务”到底怎么保证数据一致。即使你刚学完 Python 基础,也能跟得上!
为什么需要分布式事务?
想象一下:你在电商平台下单买书。系统要做两件事:
- 扣减库存(库存服务)
- 创建订单(订单服务)
这两个操作分别在两个不同的数据库里执行。如果第一步成功了,第二步却失败了(比如网络中断),就会出现“库存少了但没生成订单”的问题——用户白亏,商家也亏。
单体应用中,我们可以用数据库事务(BEGIN...COMMIT)保证原子性。但在分布式系统中,多个服务各自管理自己的数据库,传统事务就失效了。
这时候,就需要分布式事务解决方案来协调多个服务,确保“要么全成功,要么全回滚”。
💡 小知识:分布式事务 ≠ 数据库事务。它是跨服务、跨数据库的一致性保障机制。
环境准备:5分钟搭好开发环境
我们用最轻量的方式搭建实验环境,不需要 Docker 或复杂中间件。
所需工具清单
| 工具 | 版本要求 | 安装方式 |
|---|---|---|
| Python | ≥3.8 | 官网下载或 pyenv |
| pip | 最新 | 通常随 Python 自带 |
| Flask | 最新 | pip install flask |
| SQLite | 内置 | 无需安装 |
| requests | 最新 | pip install requests |
✅ 提示:本文所有代码兼容 Python 3.8+,无需虚拟环境也可运行(但建议使用
venv隔离依赖)。
创建项目结构
mkdir distributed-tx-demo
cd distributed-tx-demo
touch inventory.py order.py main.py requirements.txt
然后安装依赖:
echo "flask\nrequests" > requirements.txt
pip install -r requirements.txt
搞定!接下来我们就要动手写服务了。
核心概念:用生活例子讲清楚三种方案
分布式事务有多种实现方式。对初学者来说,掌握以下三种就够了:
1. 两阶段提交(2PC)——像婚礼主持人
类比:主持人问新郎“你愿意吗?”(准备阶段),再问新娘“你愿意吗?”,只有两人都说“愿意”,才宣布结婚(提交阶段)。
特点:
- 强一致性
- 性能差,容易阻塞
- 适合数据库内部(如 MySQL XA),不适合业务服务
🚫 初学者慎用:2PC 在业务层实现复杂,且对服务侵入性强。
2. TCC(Try-Confirm-Cancel)——像订酒店
- Try:预占房间(检查库存并冻结)
- Confirm:真正入住(扣减库存)
- Cancel:取消预订(释放冻结)
优点:灵活、高性能
缺点:每个服务都要写三套逻辑,开发成本高
3. 本地消息表 + 最终一致性 —— 我们的选择!
这是最适合初学者的方案。核心思想是:
把“跨服务调用”变成“本地事务 + 异步重试”
具体步骤:
- 在订单服务中,先写本地消息表(和创建订单在一个事务里)
- 后台任务轮询消息表,异步调用库存服务
- 如果调用失败,不断重试直到成功(最终一致)
✅ 优点:简单、可靠、不依赖外部中间件
✅ 适合 80% 的业务场景(如电商、支付)
👨🏫 我当初第一个上线的分布式系统就是用这个方案,至今稳定运行三年。
实战:用 Python 实现订单-库存系统
我们现在动手实现一个简化版系统,包含两个服务:
order.py:订单服务(含本地消息表)inventory.py:库存服务
第一步:实现库存服务
# inventory.py
from flask import Flask, request, jsonify
import sqlite3
app = Flask(__name__)
def init_db():
conn = sqlite3.connect('inventory.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS stock (
item_id TEXT PRIMARY KEY,
quantity INTEGER
)''')
# 初始化一本书,库存为10
c.execute("INSERT OR IGNORE INTO stock (item_id, quantity) VALUES ('book-001', 10)")
conn.commit()
conn.close()
@app.route('/deduct', methods=['POST'])
def deduct_stock():
data = request.json
item_id = data['item_id']
amount = data['amount']
conn = sqlite3.connect('inventory.db')
c = conn.cursor()
# 检查库存是否足够
c.execute("SELECT quantity FROM stock WHERE item_id = ?", (item_id,))
row = c.fetchone()
if not row or row[0] < amount:
return jsonify({"error": "Insufficient stock"}), 400
# 扣减库存
c.execute("UPDATE stock SET quantity = quantity - ? WHERE item_id = ?", (amount, item_id))
conn.commit()
conn.close()
return jsonify({"status": "success"})
if __name__ == '__main__':
init_db()
app.run(port=5001)
启动命令:
python inventory.py
第二步:实现订单服务(含本地消息表)
# order.py
from flask import Flask, request, jsonify
import sqlite3
import threading
import time
import requests
app = Flask(__name__)
def init_db():
conn = sqlite3.connect('order.db')
c = conn.cursor()
# 订单表
c.execute('''CREATE TABLE IF NOT EXISTS orders (
order_id TEXT PRIMARY KEY,
item_id TEXT,
amount INTEGER
)''')
# 本地消息表
c.execute('''CREATE TABLE IF NOT EXISTS outbox (
id INTEGER PRIMARY KEY AUTOINCREMENT,
order_id TEXT,
status TEXT DEFAULT 'pending' -- pending, success, failed
)''')
conn.commit()
conn.close()
def send_to_inventory(order_id):
"""模拟异步发送消息(实际可用 Celery 或线程池)"""
conn = sqlite3.connect('order.db')
c = conn.cursor()
c.execute("SELECT item_id, amount FROM orders WHERE order_id = ?", (order_id,))
order = c.fetchone()
conn.close()
if not order:
return
item_id, amount = order
try:
resp = requests.post('http://localhost:5001/deduct', json={
"item_id": item_id,
"amount": amount
}, timeout=3)
if resp.status_code == 200:
# 更新消息状态为成功
conn = sqlite3.connect('order.db')
c = conn.cursor()
c.execute("UPDATE outbox SET status = 'success' WHERE order_id = ?", (order_id,))
conn.commit()
conn.close()
print(f"[SUCCESS] Deducted stock for order {order_id}")
else:
raise Exception("Inventory service rejected")
except Exception as e:
print(f"[RETRYING] Failed to deduct stock for {order_id}: {e}")
# 这里可以记录失败次数,超过阈值告警
time.sleep(2) # 简单重试
send_to_inventory(order_id) # 递归重试(生产环境改用队列)
@app.route('/create_order', methods=['POST'])
def create_order():
data = request.json
order_id = data['order_id']
item_id = data['item_id']
amount = data['amount']
conn = sqlite3.connect('order.db')
c = conn.cursor()
try:
# 1. 创建订单
c.execute("INSERT INTO orders (order_id, item_id, amount) VALUES (?, ?, ?)",
(order_id, item_id, amount))
# 2. 插入本地消息(同一个事务!)
c.execute("INSERT INTO outbox (order_id) VALUES (?)", (order_id,))
conn.commit()
# 3. 异步处理消息(新开线程避免阻塞HTTP请求)
threading.Thread(target=send_to_inventory, args=(order_id,)).start()
return jsonify({"status": "order created, processing stock deduction"})
except Exception as e:
conn.rollback()
return jsonify({"error": str(e)}), 500
finally:
conn.close()
# 启动后台重试任务(简化版)
def start_retry_worker():
def worker():
while True:
time.sleep(5)
conn = sqlite3.connect('order.db')
c = conn.cursor()
c.execute("SELECT order_id FROM outbox WHERE status = 'pending'")
pending = c.fetchall()
conn.close()
for (order_id,) in pending:
threading.Thread(target=send_to_inventory, args=(order_id,)).start()
threading.Thread(target=worker, daemon=True).start()
if __name__ == '__main__':
init_db()
start_retry_worker() # 启动重试线程
app.run(port=5000)
启动命令:
python order.py
第三步:测试整个流程
打开第三个终端,发送下单请求:
curl -X POST http://localhost:5000/create_order \
-H "Content-Type: application/json" \
-d '{"order_id": "ord-123", "item_id": "book-001", "amount": 1}'
你会看到:
- 订单服务返回成功
- 终端打印
[SUCCESS] Deducted stock for order ord-123 - 查询库存:
sqlite3 inventory.db "SELECT * FROM stock;"→ 数量变为9
✅ 这就是最终一致性:订单先创建,库存稍后扣减,但最终数据一致。
新手常见问题解答
Q1:为什么不用数据库的 XA 事务(2PC)?
XA 对业务代码侵入大,且要求所有参与方支持 XA 协议(很多 NoSQL 不支持)。本地消息表方案更通用、更易调试。
Q2:消息重复投递怎么办?库存被多扣?
好问题!这就是为什么库存服务必须幂等。我们的 /deduct 接口虽然没显式处理,但你可以改进:
- 方案1:传入唯一请求 ID,服务端记录已处理的 ID
- 方案2:用“设置新库存值”代替“扣减”,避免多次扣减
Q3:Claude Code 和 Prompt 工程在这里有什么用?
你可能注意到了文章开头提到的关键词。这里解释一下:
Claude Code 是 Anthropic 推出的 AI 编程助手,可帮助生成分布式事务的样板代码。
Prompt 工程 指如何有效提问 AI。例如,你可以这样问 Claude:
“用 Python 写一个基于本地消息表的分布式事务 demo,包含订单和库存服务,使用 Flask 和 SQLite。”
好的 prompt 能让 AI 输出更贴近你需求的代码(比如指定 v0 版本、避免复杂依赖)。
🔧 实践建议:初学者可先手写代码理解原理,再用 AI 工具加速开发。
Q4:生产环境能这么简单吗?
不能。真实系统还需要:
- 消息表增加重试次数限制
- 失败消息进入死信队列人工干预
- 使用 RabbitMQ/Kafka 替代轮询
- 加入监控和告警
但核心思想不变——先保证本地事务,再异步同步。
下一步学习建议
你已经掌握了分布式事务的核心思路!接下来可以:
- 深入 TCC 模式:尝试改造当前系统,加入 Try/Confirm/Cancel 三个接口
- 引入消息队列:用 Redis 或 RabbitMQ 替代轮询,提升性能
- 学习 Saga 模式:适用于长流程事务(如旅行预订)
- 阅读开源项目:如 Seata(Java)、DTM(Go)的源码
📌 避坑指南:不要一上来就学 Seata 或 RocketMQ 事务消息!先掌握“本地消息表”这种朴素但有效的方案,再进阶。
结语
分布式事务听起来高大上,但本质是“把复杂问题拆解成简单步骤”。我们用不到 100 行 Python 代码,就实现了一个可靠的最终一致性系统。
记住:没有银弹,只有合适场景的解决方案。对大多数业务,本地消息表 + 幂等设计已经足够。
希望这篇实践驱动的教程能帮你迈出第一步。如果你跑通了代码,欢迎在评论区告诉我!也别忘了 star 你的第一个分布式项目 😄
作者:开源项目维护者 | 教学理念:先跑起来,再优化

评论 0