分布式事务太难?用 Python 手把手带你搞定最佳实践
大家好,我是一名工作 5 年的后端开发工程师,也带过不少新人。最近面试时发现,很多同学对“分布式事务”这个词一脸懵——既说不清它是什么,也不知道怎么解决。其实我当初学的时候也卡在这儿很久,翻文档、看论文、试错无数次,才慢慢搞明白。
今天这篇教程,就是想用最直白的语言 + 最简单的 Python 代码,带你从零理解分布式事务的核心思想和实战方案。无论你是刚学编程的小白,还是准备面试的求职者,都能跟着一步步动手做出来。
💡 本文特别适合:
- 完全没接触过分布式系统的新手
- 正在准备后端/架构类面试题(比如“如何保证跨服务数据一致性?”)
- 想用 Python 实践分布式事务的同学
一、什么是分布式事务?为什么需要它?
先别被“事务”吓到!你可以把它想象成“一组操作必须全部成功,或者全部失败”。
单机事务 vs 分布式事务
- 单机事务(比如 MySQL 的
BEGIN...COMMIT):所有操作都在同一个数据库里,很容易保证一致性。 - 分布式事务:你的系统拆成了多个服务(比如用户服务、订单服务、库存服务),每个服务有自己的数据库。这时候,一个业务操作可能要同时改多个库——万一中间某个步骤失败了,其他库已经改了,数据就乱了!
举个栗子🌰:
用户下单 → 扣库存 + 创建订单 + 扣余额
如果“扣库存”成功了,但“创建订单”失败了,那库存白白少了,钱也没收,老板要哭了 😭
所以,分布式事务的目标就是:让跨服务的操作像单机事务一样可靠。
二、环境准备:3 分钟搭好 Python 开发环境
我们不需要复杂的中间件,只用 Python + 两个 SQLite 数据库(模拟两个服务),就能演示核心逻辑。
所需工具
| 工具 | 版本 | 说明 |
|---|---|---|
| Python | ≥3.8 | 推荐 3.10+ |
| pip | 自带 | 包管理器 |
| sqlite3 | 自带 | 轻量级数据库 |
安装依赖
pip install flask sqlalchemy
✅ Flask 用来模拟两个微服务
✅ SQLAlchemy 是 ORM,方便操作数据库
创建项目结构
mkdir distributed-tx-demo
cd distributed-tx-demo
touch order_service.py inventory_service.py main.py
三、核心概念:三大主流方案一句话讲清
分布式事务有几种经典解法,新手最容易混淆。我用大白话总结:
| 方案 | 核心思想 | 适用场景 | 缺点 |
|---|---|---|---|
| 2PC(两阶段提交) | “先问大家能不能提交,都同意再真提交” | 强一致性要求高 | 性能差、阻塞风险 |
| TCC(Try-Confirm-Cancel) | “先冻结资源,再确认或回滚” | 金融、支付等关键业务 | 开发复杂度高 |
| Saga 模式 | “每个操作配一个补偿操作,失败就挨个回滚” | 长流程、最终一致性 | 不能保证隔离性 |
🎯 对于大多数互联网应用(比如电商、社交),Saga 是最实用的选择,也是我们今天要实战的方案!
四、实战:用 Python 实现一个 Saga 分布式事务
我们要实现一个简化版的“下单”流程:
- 调用库存服务 → 扣减商品数量
- 调用订单服务 → 创建订单
如果第 2 步失败,就调用库存服务的“补偿接口”把库存加回去。
第一步:写库存服务(inventory_service.py)
from flask import Flask, request, jsonify
import sqlite3
app = Flask(__name__)
# 初始化库存数据库
def init_db():
conn = sqlite3.connect('inventory.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY,
stock INTEGER
)
''')
# 初始库存:商品1 有 100 件
cursor.execute("INSERT OR IGNORE INTO products (id, stock) VALUES (1, 100)")
conn.commit()
conn.close()
@app.route('/deduct', methods=['POST'])
def deduct_stock():
data = request.json
product_id = data['product_id']
quantity = data['quantity']
conn = sqlite3.connect('inventory.db')
cursor = conn.cursor()
cursor.execute("UPDATE products SET stock = stock - ? WHERE id = ?", (quantity, product_id))
conn.commit()
conn.close()
return jsonify({"status": "success"})
@app.route('/compensate', methods=['POST'])
def compensate_stock():
"""补偿操作:把库存加回去"""
data = request.json
product_id = data['product_id']
quantity = data['quantity']
conn = sqlite3.connect('inventory.db')
cursor = conn.cursor()
cursor.execute("UPDATE products SET stock = stock + ? WHERE id = ?", (quantity, product_id))
conn.commit()
conn.close()
return jsonify({"status": "compensated"})
if __name__ == '__main__':
init_db()
app.run(port=5001)
第二步:写订单服务(order_service.py)
from flask import Flask, request, jsonify
import sqlite3
import random # 模拟偶尔失败
app = Flask(__name__)
def init_db():
conn = sqlite3.connect('orders.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
product_id INTEGER
)
''')
conn.commit()
conn.close()
@app.route('/create', methods=['POST'])
def create_order():
# 模拟 30% 概率失败(测试补偿逻辑)
if random.random() < 0.3:
return jsonify({"error": "Order creation failed!"}), 500
data = request.json
user_id = data['user_id']
product_id = data['product_id']
conn = sqlite3.connect('orders.db')
cursor = conn.cursor()
cursor.execute("INSERT INTO orders (user_id, product_id) VALUES (?, ?)", (user_id, product_id))
conn.commit()
conn.close()
return jsonify({"status": "order created"})
if __name__ == '__main__':
init_db()
app.run(port=5002)
第三步:主协调器(main.py)—— Saga 的大脑
import requests
import time
def place_order(user_id, product_id, quantity):
print("🚀 开始下单流程...")
# Step 1: 扣库存
try:
resp = requests.post('http://localhost:5001/deduct', json={
'product_id': product_id,
'quantity': quantity
})
resp.raise_for_status()
print("✅ 库存已扣减")
except Exception as e:
print(f"❌ 扣库存失败: {e}")
return False
# Step 2: 创建订单
try:
resp = requests.post('http://localhost:5002/create', json={
'user_id': user_id,
'product_id': product_id
})
resp.raise_for_status()
print("✅ 订单创建成功!")
return True
except Exception as e:
print(f"❌ 创建订单失败: {e}")
# 触发补偿:把库存加回去!
try:
requests.post('http://localhost:5001/compensate', json={
'product_id': product_id,
'quantity': quantity
})
print("🔄 已执行补偿:库存恢复")
except Exception as ce:
print(f"⚠️ 补偿也失败了!需人工介入: {ce}")
return False
if __name__ == '__main__':
# 先启动两个服务(在另外两个终端运行 inventory_service.py 和 order_service.py)
time.sleep(2)
success = place_order(user_id=123, product_id=1, quantity=1)
if success:
print("\n🎉 整个事务成功!")
else:
print("\n💥 事务失败,已尝试补偿")
运行测试
- 终端1:
python inventory_service.py - 终端2:
python order_service.py - 终端3:
python main.py
你会看到两种结果:
- 大概率成功(70%)
- 小概率失败并自动补偿(30%,因为我们加了随机失败)
🔍 关键点:即使订单创建失败,库存也会通过
/compensate接口恢复,保证最终一致性!
五、新手常见问题 & 避坑指南
Q1:Saga 能保证“强一致性”吗?
不能! 它是“最终一致性”。在补偿完成前,数据可能短暂不一致(比如库存少了但订单没建)。适合能容忍短时不一致的场景。
Q2:补偿操作失败怎么办?
这是 Saga 的难点!建议:
- 补偿接口必须幂等(重复调用结果不变)
- 加重试机制(比如指数退避)
- 最终失败时告警,人工处理
Q3:有没有现成的框架?
有!Python 中可用:
- Celery + Redis:用任务队列实现异步 Saga
- Eventuate Tram(Java 更成熟) 但理解原理比用框架更重要!
Q4:面试常问什么?
高频面试题:
- “分布式事务有哪些方案?各有什么优劣?”
- “Saga 模式如何处理并发问题?”(答:通常靠业务层加锁或版本号)
- “补偿操作怎么设计?”
💬 我面试时最爱问:“如果补偿接口本身失败了,你的系统会怎样?” —— 能答出“重试+告警+人工兜底”的,基本就过了。
六、下一步学习建议
你已经掌握了分布式事务的核心思想!接下来可以:
- 深入 Saga:学习如何用消息队列(如 RabbitMQ/Kafka)实现事件驱动的 Saga
- 试试 TCC:用 Python 模拟 Try/Confirm/Cancel 三个接口
- 了解 Seata:阿里开源的分布式事务框架(虽然主要是 Java,但思想通用)
- 读经典论文:《Sagas》 by Hector Garcia-Molina (1987)
🌟 最后送你一句话:分布式系统的本质,不是追求完美,而是在不完美中找平衡。
希望这篇教程帮你少走弯路。如果你觉得有用,欢迎分享给正在挣扎的小伙伴!
— 一个踩过无数坑的后端老兵

评论 0