分布式事务太难?用 Python 爬虫也能理解的最佳实践指南

无敌之游侠
2025-12-22 02:13
阅读 228

大家好,我是一名开源项目维护者,也经常给新人做后端技术培训。今天写这篇教程,是因为我发现很多刚接触分布式系统的同学,一听到“分布式事务”就头疼,总觉得那是高深莫测的“大厂专属技术”。其实不然!

我当初学的时候,也是从零开始,甚至一度以为这东西只能靠背面试题应付。直到我在一个用 Python 写的小型爬虫项目中意外遇到了数据一致性问题,才真正理解了分布式事务的本质。

更巧的是——你完全可以用一个简单的 Python 爬虫场景来理解它!

所以,本文将带你用最接地气的方式,搞懂分布式事务的核心思想和最佳实践。即使你连“事务”是什么都不清楚,也没关系。我们从零开始,边做边学。


为什么你需要关心分布式事务?

想象一下:你写了一个 Python 爬虫,每天从多个网站抓取商品价格,并把数据存到数据库里。某天你想加个新功能:

“当成功爬取到商品信息后,自动发一条通知到消息队列(比如 RabbitMQ),告诉其他服务‘有新数据来了’。”

听起来很简单?但问题来了:

  • 如果先存数据库成功,但发消息失败 → 数据库有数据,但其他服务不知道,导致后续流程卡住。
  • 如果先发消息成功,但存数据库失败 → 其他服务收到通知去查数据,却发现没有,造成混乱。

这就是典型的跨系统操作的一致性问题——而解决这类问题的技术,就叫 分布式事务

📌 一句话定义:分布式事务,就是让多个独立系统(比如数据库、消息队列、缓存)在一次业务操作中,要么全部成功,要么全部失败,保持数据一致。


环境准备:5分钟搭建学习环境

我们不需要复杂的微服务架构!只需要以下工具:

工具 版本要求 用途
Python ≥ 3.8 编写爬虫和模拟服务
pip 最新版 安装依赖
SQLite 内置 本地数据库
Redis(可选) ≥ 6.0 模拟外部存储或缓存
虚拟环境 venvconda 隔离依赖

步骤 1:创建项目目录

mkdir distributed-tx-demo
cd distributed-tx-demo
python -m venv venv
source venv/bin/activate  # Linux/Mac
# 或 venv\Scripts\activate  # Windows

步骤 2:安装必要依赖

pip install requests sqlite3 redis

💡 提示:如果你还没装 Redis,可以先跳过,我们后面会提供纯 Python 的替代方案。

步骤 3:验证环境

新建 test_env.py

import sqlite3
import requests

# 测试数据库
conn = sqlite3.connect(":memory:")
print("✅ SQLite OK")

# 测试网络请求
resp = requests.get("https://httpbin.org/get")
print("✅ Requests OK" if resp.status_code == 200 else "❌ Network error")

运行:

python test_env.py

如果看到两个 ✅,说明环境准备完毕!


核心概念:用爬虫故事讲清分布式事务

1. 什么是“事务”?

先别被术语吓到。事务其实就是一组操作,必须满足四个特性(ACID):

  • Atomicity(原子性):要么全做,要么全不做。
  • Consistency(一致性):操作前后,数据都合法。
  • Isolation(隔离性):多个事务互不干扰。
  • Durability(持久性):一旦成功,结果永久保存。

单机数据库(比如 MySQL)天然支持事务。但当你把操作分散到不同系统(比如数据库 + 消息队列),就变成“分布式”了。

2. 常见的分布式事务解决方案

目前主流方案有三种,我们用爬虫场景类比:

方案 类比故事 适用场景 优缺点
两阶段提交(2PC) 爬虫先问所有系统:“你们准备好存数据了吗?”等全部说“OK”,再统一执行。 强一致性要求高,系统少 可靠但性能差,容易阻塞
TCC(Try-Confirm-Cancel) 爬虫先“冻结”资源(Try),确认无误后再“提交”(Confirm),出错就“释放”(Cancel)。 金融、订单等关键业务 灵活但开发复杂
最终一致性(消息队列) 爬虫存完数据库,发一条“可靠消息”;如果消息失败,就重试直到成功。 大多数互联网场景 简单高效,但有短暂不一致

🔍 重点:对初学者来说,最终一致性 + 消息队列 是最容易上手且最常用的方案!


实战:用 Python 爬虫实现“最终一致性”

我们来做一个小项目:爬取 GitHub 用户信息,存入数据库,并发送通知

目标:确保“存数据库”和“发通知”最终一致。

第一步:设计数据表

# db_setup.py
import sqlite3

def init_db():
    conn = sqlite3.connect("github_users.db")
    cursor = conn.cursor()
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY,
            username TEXT UNIQUE,
            name TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)
    # 新增消息状态表(用于重试机制)
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS message_queue (
            id INTEGER PRIMARY KEY,
            user_id INTEGER,
            status TEXT DEFAULT 'pending', -- pending / sent / failed
            retry_count INTEGER DEFAULT 0,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)
    conn.commit()
    conn.close()

if __name__ == "__main__":
    init_db()
    print("✅ 数据库初始化完成")

第二步:编写“伪消息队列”逻辑

因为我们没用 RabbitMQ/Kafka,就用数据库表模拟消息队列。

# message_service.py
import sqlite3
import time
import random

class FakeMessageQueue:
    def send_notification(self, user_id):
        """模拟发送通知,有 20% 概率失败"""
        if random.random() < 0.2:
            raise Exception("Network timeout")
        print(f"📨 通知已发送: user_id={user_id}")
        return True

    def mark_message_sent(self, user_id):
        conn = sqlite3.connect("github_users.db")
        cursor = conn.cursor()
        cursor.execute(
            "UPDATE message_queue SET status='sent' WHERE user_id=?",
            (user_id,)
        )
        conn.commit()
        conn.close()

    def mark_message_failed(self, user_id):
        conn = sqlite3.connect("github_users.db")
        cursor = conn.cursor()
        cursor.execute("""
            UPDATE message_queue 
            SET status='failed', retry_count=retry_count+1 
            WHERE user_id=?
        """, (user_id,))
        conn.commit()
        conn.close()

第三步:主爬虫逻辑(含事务保障)

# crawler.py
import sqlite3
import requests
from message_service import FakeMessageQueue

mq = FakeMessageQueue()

def save_user_and_notify(username):
    conn = sqlite3.connect("github_users.db")
    cursor = conn.cursor()
    
    try:
        # 1. 获取用户数据
        resp = requests.get(f"https://api.github.com/users/{username}")
        if resp.status_code != 200:
            raise Exception("User not found")
        user_data = resp.json()
        
        # 2. 插入用户(带唯一约束)
        cursor.execute("""
            INSERT OR IGNORE INTO users (username, name) VALUES (?, ?)
        """, (user_data['login'], user_data.get('name', '')))
        user_id = cursor.lastrowid
        
        if user_id == 0:
            # 用户已存在,查 ID
            cursor.execute("SELECT id FROM users WHERE username=?", (username,))
            user_id = cursor.fetchone()[0]
        
        # 3. 插入消息任务(关键!先记录再执行)
        cursor.execute("""
            INSERT OR IGNORE INTO message_queue (user_id) VALUES (?)
        """, (user_id,))
        
        conn.commit()  # 👈 此时事务结束:数据和消息任务都持久化了
        print(f"💾 用户 {username} 保存成功,ID={user_id}")
        
        # 4. 尝试发送通知(可失败,但任务已记录)
        try:
            mq.send_notification(user_id)
            mq.mark_message_sent(user_id)
        except Exception as e:
            print(f"⚠️ 通知发送失败: {e}")
            mq.mark_message_failed(user_id)
            # 后续由定时任务重试
            
    except Exception as e:
        conn.rollback()
        print(f"❌ 操作失败: {e}")
    finally:
        conn.close()

第四步:添加“重试机制”(保障最终一致性)

# retry_worker.py
import sqlite3
import time
from message_service import FakeMessageQueue

mq = FakeMessageQueue()

def retry_failed_messages(max_retries=3):
    conn = sqlite3.connect("github_users.db")
    cursor = conn.cursor()
    
    # 查找失败次数 < max_retries 的任务
    cursor.execute("""
        SELECT user_id FROM message_queue 
        WHERE status = 'failed' AND retry_count < ?
    """, (max_retries,))
    
    tasks = cursor.fetchall()
    for (user_id,) in tasks:
        print(f"🔁 重试发送通知: user_id={user_id}")
        try:
            mq.send_notification(user_id)
            mq.mark_message_sent(user_id)
        except Exception as e:
            print(f"⚠️ 重试仍失败: {e}")
            mq.mark_message_failed(user_id)  # retry_count 自增
    
    conn.close()

# 每10秒检查一次
if __name__ == "__main__":
    while True:
        retry_failed_messages()
        time.sleep(10)

运行整个流程

终端 1(启动重试服务):

python retry_worker.py

终端 2(运行爬虫):

python crawler.py
# 在代码末尾加:
if __name__ == "__main__":
    save_user_and_notify("octocat")  # GitHub 官方测试账号

你会看到:

  • 即使第一次通知失败,重试服务会在几秒后再次尝试
  • 最终,数据库和“通知”状态都会一致

这就是“最终一致性”的核心:通过“记录 + 重试”机制,保证操作最终成功!


新手常见问题解答

Q1:为什么不能直接在 try 里发消息,失败就回滚数据库?

因为消息队列(如 RabbitMQ)不支持回滚!一旦消息发出,就无法撤回。所以必须先确保本地操作成功,再发消息。

Q2:如果重试一直失败怎么办?

  • 设置最大重试次数(比如 3 次)
  • 失败后人工介入(写日志、发告警)
  • 或降级处理(比如记录到“死信队列”)

Q3:这个方案能用在生产环境吗?

对于非金融场景(如日志、通知、缓存更新),完全可以!很多大厂(包括阿里、美团)的核心链路初期都用类似方案。

Q4:和 Seata、Atomikos 这些框架比呢?

那些是基于 2PC/TCC 的强一致性方案,适合银行转账。而我们的方案更轻量,适合大多数互联网应用。


学习建议与下一步

📚 推荐学习路径

  1. 先掌握单机事务:用 SQLite/MySQL 练习 BEGIN, COMMIT, ROLLBACK
  2. 理解消息队列原理:学习 RabbitMQ 或 Kafka 的基本使用
  3. 尝试真实中间件:用 Python 的 pika 库连接 RabbitMQ 替代我们的“伪队列”
  4. 了解 Saga 模式:另一种最终一致性实现方式

⚠️ 避坑指南

  • 不要追求“实时强一致”:除非是支付、余额场景,否则最终一致性足够
  • 一定要记录操作日志:方便排查“到底哪一步失败了”
  • 重试要有退避策略:比如第一次 1 秒后重试,第二次 2 秒,第三次 4 秒……

🔧 扩展练习

  • 把 SQLite 换成 MySQL,观察事务行为
  • 用 Redis 作为缓存,爬取后同时更新 DB 和缓存,如何保证一致?
  • 加一个“删除用户”功能,实现 TCC 中的 Cancel 逻辑

结语

分布式事务听起来高大上,但本质就是想办法让多个系统“步调一致”。通过一个简单的 Python 爬虫项目,你已经掌握了最实用的“最终一致性”方案。

记住:技术不是为了炫技,而是为了解决实际问题。哪怕你只是写个小爬虫,只要涉及多系统协作,就可能遇到一致性挑战。

希望这篇教程能帮你迈出第一步。如果你觉得有用,欢迎关注我的开源项目(GitHub 搜 distributed-tx-demo),我会持续更新更多实战案例!

最后送你一句话:“复杂的系统,往往始于简单的思考。” —— 共勉!

评论 0

最热最新
暂无评论
匿名用户Lv.1
0
影响力
0
文章
0
粉丝