分布式事务太难?用 Python 爬虫也能理解的最佳实践指南
大家好,我是一名开源项目维护者,也经常给新人做后端技术培训。今天写这篇教程,是因为我发现很多刚接触分布式系统的同学,一听到“分布式事务”就头疼,总觉得那是高深莫测的“大厂专属技术”。其实不然!
我当初学的时候,也是从零开始,甚至一度以为这东西只能靠背面试题应付。直到我在一个用 Python 写的小型爬虫项目中意外遇到了数据一致性问题,才真正理解了分布式事务的本质。
更巧的是——你完全可以用一个简单的 Python 爬虫场景来理解它!
所以,本文将带你用最接地气的方式,搞懂分布式事务的核心思想和最佳实践。即使你连“事务”是什么都不清楚,也没关系。我们从零开始,边做边学。
为什么你需要关心分布式事务?
想象一下:你写了一个 Python 爬虫,每天从多个网站抓取商品价格,并把数据存到数据库里。某天你想加个新功能:
“当成功爬取到商品信息后,自动发一条通知到消息队列(比如 RabbitMQ),告诉其他服务‘有新数据来了’。”
听起来很简单?但问题来了:
- 如果先存数据库成功,但发消息失败 → 数据库有数据,但其他服务不知道,导致后续流程卡住。
- 如果先发消息成功,但存数据库失败 → 其他服务收到通知去查数据,却发现没有,造成混乱。
这就是典型的跨系统操作的一致性问题——而解决这类问题的技术,就叫 分布式事务。
📌 一句话定义:分布式事务,就是让多个独立系统(比如数据库、消息队列、缓存)在一次业务操作中,要么全部成功,要么全部失败,保持数据一致。
环境准备:5分钟搭建学习环境
我们不需要复杂的微服务架构!只需要以下工具:
| 工具 | 版本要求 | 用途 |
|---|---|---|
| Python | ≥ 3.8 | 编写爬虫和模拟服务 |
| pip | 最新版 | 安装依赖 |
| SQLite | 内置 | 本地数据库 |
| Redis(可选) | ≥ 6.0 | 模拟外部存储或缓存 |
| 虚拟环境 | venv 或 conda |
隔离依赖 |
步骤 1:创建项目目录
mkdir distributed-tx-demo
cd distributed-tx-demo
python -m venv venv
source venv/bin/activate # Linux/Mac
# 或 venv\Scripts\activate # Windows
步骤 2:安装必要依赖
pip install requests sqlite3 redis
💡 提示:如果你还没装 Redis,可以先跳过,我们后面会提供纯 Python 的替代方案。
步骤 3:验证环境
新建 test_env.py:
import sqlite3
import requests
# 测试数据库
conn = sqlite3.connect(":memory:")
print("✅ SQLite OK")
# 测试网络请求
resp = requests.get("https://httpbin.org/get")
print("✅ Requests OK" if resp.status_code == 200 else "❌ Network error")
运行:
python test_env.py
如果看到两个 ✅,说明环境准备完毕!
核心概念:用爬虫故事讲清分布式事务
1. 什么是“事务”?
先别被术语吓到。事务其实就是一组操作,必须满足四个特性(ACID):
- Atomicity(原子性):要么全做,要么全不做。
- Consistency(一致性):操作前后,数据都合法。
- Isolation(隔离性):多个事务互不干扰。
- Durability(持久性):一旦成功,结果永久保存。
单机数据库(比如 MySQL)天然支持事务。但当你把操作分散到不同系统(比如数据库 + 消息队列),就变成“分布式”了。
2. 常见的分布式事务解决方案
目前主流方案有三种,我们用爬虫场景类比:
| 方案 | 类比故事 | 适用场景 | 优缺点 |
|---|---|---|---|
| 两阶段提交(2PC) | 爬虫先问所有系统:“你们准备好存数据了吗?”等全部说“OK”,再统一执行。 | 强一致性要求高,系统少 | 可靠但性能差,容易阻塞 |
| TCC(Try-Confirm-Cancel) | 爬虫先“冻结”资源(Try),确认无误后再“提交”(Confirm),出错就“释放”(Cancel)。 | 金融、订单等关键业务 | 灵活但开发复杂 |
| 最终一致性(消息队列) | 爬虫存完数据库,发一条“可靠消息”;如果消息失败,就重试直到成功。 | 大多数互联网场景 | 简单高效,但有短暂不一致 |
🔍 重点:对初学者来说,最终一致性 + 消息队列 是最容易上手且最常用的方案!
实战:用 Python 爬虫实现“最终一致性”
我们来做一个小项目:爬取 GitHub 用户信息,存入数据库,并发送通知。
目标:确保“存数据库”和“发通知”最终一致。
第一步:设计数据表
# db_setup.py
import sqlite3
def init_db():
conn = sqlite3.connect("github_users.db")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
username TEXT UNIQUE,
name TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 新增消息状态表(用于重试机制)
cursor.execute("""
CREATE TABLE IF NOT EXISTS message_queue (
id INTEGER PRIMARY KEY,
user_id INTEGER,
status TEXT DEFAULT 'pending', -- pending / sent / failed
retry_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
if __name__ == "__main__":
init_db()
print("✅ 数据库初始化完成")
第二步:编写“伪消息队列”逻辑
因为我们没用 RabbitMQ/Kafka,就用数据库表模拟消息队列。
# message_service.py
import sqlite3
import time
import random
class FakeMessageQueue:
def send_notification(self, user_id):
"""模拟发送通知,有 20% 概率失败"""
if random.random() < 0.2:
raise Exception("Network timeout")
print(f"📨 通知已发送: user_id={user_id}")
return True
def mark_message_sent(self, user_id):
conn = sqlite3.connect("github_users.db")
cursor = conn.cursor()
cursor.execute(
"UPDATE message_queue SET status='sent' WHERE user_id=?",
(user_id,)
)
conn.commit()
conn.close()
def mark_message_failed(self, user_id):
conn = sqlite3.connect("github_users.db")
cursor = conn.cursor()
cursor.execute("""
UPDATE message_queue
SET status='failed', retry_count=retry_count+1
WHERE user_id=?
""", (user_id,))
conn.commit()
conn.close()
第三步:主爬虫逻辑(含事务保障)
# crawler.py
import sqlite3
import requests
from message_service import FakeMessageQueue
mq = FakeMessageQueue()
def save_user_and_notify(username):
conn = sqlite3.connect("github_users.db")
cursor = conn.cursor()
try:
# 1. 获取用户数据
resp = requests.get(f"https://api.github.com/users/{username}")
if resp.status_code != 200:
raise Exception("User not found")
user_data = resp.json()
# 2. 插入用户(带唯一约束)
cursor.execute("""
INSERT OR IGNORE INTO users (username, name) VALUES (?, ?)
""", (user_data['login'], user_data.get('name', '')))
user_id = cursor.lastrowid
if user_id == 0:
# 用户已存在,查 ID
cursor.execute("SELECT id FROM users WHERE username=?", (username,))
user_id = cursor.fetchone()[0]
# 3. 插入消息任务(关键!先记录再执行)
cursor.execute("""
INSERT OR IGNORE INTO message_queue (user_id) VALUES (?)
""", (user_id,))
conn.commit() # 👈 此时事务结束:数据和消息任务都持久化了
print(f"💾 用户 {username} 保存成功,ID={user_id}")
# 4. 尝试发送通知(可失败,但任务已记录)
try:
mq.send_notification(user_id)
mq.mark_message_sent(user_id)
except Exception as e:
print(f"⚠️ 通知发送失败: {e}")
mq.mark_message_failed(user_id)
# 后续由定时任务重试
except Exception as e:
conn.rollback()
print(f"❌ 操作失败: {e}")
finally:
conn.close()
第四步:添加“重试机制”(保障最终一致性)
# retry_worker.py
import sqlite3
import time
from message_service import FakeMessageQueue
mq = FakeMessageQueue()
def retry_failed_messages(max_retries=3):
conn = sqlite3.connect("github_users.db")
cursor = conn.cursor()
# 查找失败次数 < max_retries 的任务
cursor.execute("""
SELECT user_id FROM message_queue
WHERE status = 'failed' AND retry_count < ?
""", (max_retries,))
tasks = cursor.fetchall()
for (user_id,) in tasks:
print(f"🔁 重试发送通知: user_id={user_id}")
try:
mq.send_notification(user_id)
mq.mark_message_sent(user_id)
except Exception as e:
print(f"⚠️ 重试仍失败: {e}")
mq.mark_message_failed(user_id) # retry_count 自增
conn.close()
# 每10秒检查一次
if __name__ == "__main__":
while True:
retry_failed_messages()
time.sleep(10)
运行整个流程
终端 1(启动重试服务):
python retry_worker.py
终端 2(运行爬虫):
python crawler.py
# 在代码末尾加:
if __name__ == "__main__":
save_user_and_notify("octocat") # GitHub 官方测试账号
你会看到:
- 即使第一次通知失败,重试服务会在几秒后再次尝试
- 最终,数据库和“通知”状态都会一致
✅ 这就是“最终一致性”的核心:通过“记录 + 重试”机制,保证操作最终成功!
新手常见问题解答
Q1:为什么不能直接在 try 里发消息,失败就回滚数据库?
因为消息队列(如 RabbitMQ)不支持回滚!一旦消息发出,就无法撤回。所以必须先确保本地操作成功,再发消息。
Q2:如果重试一直失败怎么办?
- 设置最大重试次数(比如 3 次)
- 失败后人工介入(写日志、发告警)
- 或降级处理(比如记录到“死信队列”)
Q3:这个方案能用在生产环境吗?
对于非金融场景(如日志、通知、缓存更新),完全可以!很多大厂(包括阿里、美团)的核心链路初期都用类似方案。
Q4:和 Seata、Atomikos 这些框架比呢?
那些是基于 2PC/TCC 的强一致性方案,适合银行转账。而我们的方案更轻量,适合大多数互联网应用。
学习建议与下一步
📚 推荐学习路径
- 先掌握单机事务:用 SQLite/MySQL 练习
BEGIN,COMMIT,ROLLBACK - 理解消息队列原理:学习 RabbitMQ 或 Kafka 的基本使用
- 尝试真实中间件:用 Python 的
pika库连接 RabbitMQ 替代我们的“伪队列” - 了解 Saga 模式:另一种最终一致性实现方式
⚠️ 避坑指南
- 不要追求“实时强一致”:除非是支付、余额场景,否则最终一致性足够
- 一定要记录操作日志:方便排查“到底哪一步失败了”
- 重试要有退避策略:比如第一次 1 秒后重试,第二次 2 秒,第三次 4 秒……
🔧 扩展练习
- 把 SQLite 换成 MySQL,观察事务行为
- 用 Redis 作为缓存,爬取后同时更新 DB 和缓存,如何保证一致?
- 加一个“删除用户”功能,实现 TCC 中的 Cancel 逻辑
结语
分布式事务听起来高大上,但本质就是想办法让多个系统“步调一致”。通过一个简单的 Python 爬虫项目,你已经掌握了最实用的“最终一致性”方案。
记住:技术不是为了炫技,而是为了解决实际问题。哪怕你只是写个小爬虫,只要涉及多系统协作,就可能遇到一致性挑战。
希望这篇教程能帮你迈出第一步。如果你觉得有用,欢迎关注我的开源项目(GitHub 搜 distributed-tx-demo),我会持续更新更多实战案例!
最后送你一句话:“复杂的系统,往往始于简单的思考。” —— 共勉!

评论 0