分布式事务怎么搞?手把手教你用Python实战最佳实践

卓越之导师
2026-03-02 10:28
阅读 629

大家好,我是小林,一名211高校的计算机研究生,平时喜欢在博客上分享后端开发经验。最近有好几个学弟问我:“分布式事务到底该怎么处理?网上资料太杂了,看得头晕。” 我特别理解这种感觉——我当初学的时候也是一头雾水,各种“两阶段提交”“TCC”“Saga”名词堆在一起,根本不知道从哪下手。

更别提现在还要和前端、AI工具(比如GPT-4)打交道,系统越来越复杂。所以今天我就用最接地气的方式,带零基础的朋友一步步搞懂分布式事务的最佳实践。我们还会结合 Python 编写真实代码,并巧妙利用 GPT-4 的 Function Calling 能力来辅助设计,甚至让前端也能安全参与进来!

别担心没基础,只要你写过几行 Python,就能跟上。


为什么我们需要分布式事务?

想象一个电商场景:用户下单时,要同时做两件事:

  1. 扣减库存(库存服务)
  2. 创建订单(订单服务)

这两个操作分别在不同的服务里。如果只成功了一个(比如库存扣了但订单没建),数据就乱了!这就是典型的分布式事务问题

💡 单体应用里用数据库事务(BEGIN/COMMIT)就能搞定;但微服务架构下,多个数据库独立运行,传统事务失效了。


环境准备:5分钟搭好开发环境

我们要用 Python 模拟两个微服务,并用轻量级框架快速验证。不需要 Docker 或复杂中间件,适合新手入门。

所需工具清单

工具 版本 用途
Python ≥3.8 主语言
FastAPI 最新 构建 REST API
httpx 最新 服务间调用
uvicorn 最新 运行服务
GPT-4 API(可选) - 辅助设计函数

安装命令

pip install fastapi uvicorn httpx

📌 提示:如果你没有 OpenAI API Key,可以跳过 GPT-4 部分,不影响核心逻辑学习。


核心概念:4种主流方案一句话讲清

别被术语吓到,其实就四种思路:

方案 原理 适合场景 缺点
2PC(两阶段提交) 协调者问所有参与者“能不能提交”,都同意才真提交 强一致性要求高,如银行转账 性能差,阻塞风险大
TCC(Try-Confirm-Cancel) 先预留资源(Try),再确认(Confirm)或回滚(Cancel) 高并发电商、支付 业务侵入强,需写三套逻辑
Saga 把大事务拆成多个本地事务,失败就逐个补偿 长流程业务(如旅行预订) 补偿逻辑复杂,可能不一致
消息队列+本地事务表 用消息确保最终一致性 大多数互联网场景 延迟高,不能强一致

最佳实践建议:对新人来说,消息队列+本地事务表是最友好的方案!它简单、可靠、容错强,而且不用改太多业务代码。

今天我们就用这个方案实战!


实战项目:用 Python 实现“下单扣库存”的最终一致性

我们将模拟两个服务:

  • 订单服务(Order Service)
  • 库存服务(Inventory Service)

目标:用户下单 → 订单服务创建订单 → 通知库存服务扣库存 → 若失败则重试直到成功。

第一步:搭建两个 FastAPI 服务

1. 订单服务(order_service.py

from fastapi import FastAPI, HTTPException
import httpx
import time
import random

app = FastAPI()

# 模拟本地数据库(实际用 MySQL/PostgreSQL)
orders_db = {}
inventory_url = "http://127.0.0.1:8001"

@app.post("/create_order")
async def create_order(user_id: int, item_id: int, quantity: int):
    order_id = f"order_{int(time.time())}"
    
    # 1. 先写本地订单(状态为 pending)
    orders_db[order_id] = {
        "user_id": user_id,
        "item_id": item_id,
        "quantity": quantity,
        "status": "pending"
    }

    # 2. 尝试调用库存服务
    try:
        async with httpx.AsyncClient() as client:
            resp = await client.post(
                f"{inventory_url}/deduct",
                json={"item_id": item_id, "quantity": quantity}
            )
            if resp.status_code == 200:
                orders_db[order_id]["status"] = "success"
                return {"order_id": order_id, "status": "success"}
            else:
                raise Exception("库存扣减失败")
    except Exception as e:
        # 3. 失败!标记为 failed,后续由定时任务重试
        orders_db[order_id]["status"] = "failed"
        return {"order_id": order_id, "status": "failed", "reason": str(e)}

2. 库存服务(inventory_service.py

from fastapi import FastAPI, HTTPException
import random

app = FastAPI()

# 模拟库存数据库
inventory_db = {101: 10, 102: 5}  # item_id -> stock

@app.post("/deduct")
def deduct_stock(item_id: int, quantity: int):
    # 模拟网络抖动导致偶尔失败(用于测试重试机制)
    if random.random() < 0.3:  # 30% 概率失败
        raise HTTPException(status_code=500, detail="临时故障")

    if inventory_db.get(item_id, 0) < quantity:
        raise HTTPException(status_code=400, detail="库存不足")
    
    inventory_db[item_id] -= quantity
    return {"status": "success", "remaining": inventory_db[item_id]}

第二步:实现“重试机制”——最终一致性的关键!

上面的代码有个问题:一旦库存服务失败,订单就卡在 failed 状态,不会自动恢复。

解决方案:加一个后台任务,定期扫描失败订单并重试。

order_service.py 中添加:

import asyncio

# 后台重试任务
async def retry_failed_orders():
    while True:
        await asyncio.sleep(5)  # 每5秒检查一次
        failed_orders = [
            oid for oid, o in orders_db.items() 
            if o["status"] == "failed"
        ]
        for oid in failed_orders:
            order = orders_db[oid]
            try:
                async with httpx.AsyncClient() as client:
                    resp = await client.post(
                        f"{inventory_url}/deduct",
                        json={
                            "item_id": order["item_id"],
                            "quantity": order["quantity"]
                        }
                    )
                    if resp.status_code == 200:
                        orders_db[oid]["status"] = "success"
                        print(f"✅ 订单 {oid} 重试成功!")
            except Exception as e:
                print(f"🔄 订单 {oid} 重试失败: {e}")

# 启动时运行后台任务
@app.on_event("startup")
async def startup_event():
    asyncio.create_task(retry_failed_orders())

第三步:启动服务

开两个终端:

# 终端1:启动库存服务
uvicorn inventory_service:app --port 8001

# 终端2:启动订单服务
uvicorn order_service:app --port 8000

第四步:测试下单

用 curl 或 Postman 发请求:

curl -X POST "http://127.0.0.1:8000/create_order?user_id=1&item_id=101&quantity=2"

你会看到:

  • 第一次可能返回 {"status": "failed"}
  • 但5秒后,后台任务会自动重试,最终变成 success

✅ 这就是最终一致性:不要求立刻一致,但保证“最终”会一致!


如何让前端安全参与?避免重复提交!

很多同学问:“前端怎么配合?” 其实很简单:

  1. 前端生成唯一请求 ID(req_id),每次下单带上它
  2. 后端用 req_id 去重,防止用户狂点“下单”造成重复创建

在订单服务中加一个去重缓存:

processed_reqs = set()

@app.post("/create_order")
async def create_order(user_id: int, item_id: int, quantity: int, req_id: str):
    if req_id in processed_reqs:
        return {"error": "重复请求"}
    processed_reqs.add(req_id)
    # ...后续逻辑不变

前端示例(JavaScript):

const reqId = crypto.randomUUID(); // 生成唯一ID
fetch(`/create_order?user_id=1&item_id=101&quantity=2&req_id=${reqId}`);

这样即使用户连点10次,也只会处理一次!


借助 GPT-4 的 Function Calling 辅助设计

你可能会想:“这么多服务交互,怎么设计接口才合理?”

这时候可以用 GPT-4 的 Function Calling 来帮你梳理!

示例:让 GPT-4 生成库存扣减函数规范

# 定义函数描述(供GPT-4理解)
functions = [
    {
        "name": "deduct_inventory",
        "description": "扣减商品库存",
        "parameters": {
            "type": "object",
            "properties": {
                "item_id": {"type": "integer", "description": "商品ID"},
                "quantity": {"type": "integer", "description": "扣减数量"}
            },
            "required": ["item_id", "quantity"]
        }
    }
]

然后让 GPT-4 根据用户自然语言调用该函数:

用户输入:“给商品101减3个库存”

GPT-4 返回:

{"name": "deduct_inventory", "arguments": {"item_id": 101, "quantity": 3}}

你的后端收到后,直接执行对应逻辑即可。

💡 这样做可以让前端通过自然语言触发后端操作,同时保持接口规范统一,减少出错。


新手常见问题 & 避坑指南

❓ Q1:为什么不用数据库事务跨服务?

A:因为每个服务有自己的数据库,MySQL 的事务无法跨库!分布式事务必须用更高层的协调机制。

❓ Q2:重试会不会无限循环?

A:要加最大重试次数退避策略(比如第一次等5秒,第二次10秒,第三次30秒…)。否则可能压垮服务。

❓ Q3:消息队列不是更标准吗?为什么不用?

A:你说得对!生产环境确实用 RabbitMQ/Kafka 更可靠。但我们这是教学简化版,用内存+定时任务模拟消息队列的行为,让你先理解原理。掌握后再上消息队列就容易多了。

❓ Q4:补偿操作怎么写?比如 Saga 模式。

A:举个例子:如果“扣库存”成功但“发短信”失败,就要调用“加回库存”作为补偿。这需要你为每个正向操作写一个反向操作,复杂度较高,新手慎用。


下一步学习建议

  1. 深入消息队列:学 RabbitMQ 或 Kafka,把我们的“内存重试”替换成真正的消息投递。
  2. 了解 Seata:阿里开源的分布式事务框架,支持 AT/TCC/Saga 模式,Java 生态成熟,Python 也有社区版。
  3. 研究幂等性:所有接口必须幂等(重复调用结果不变),这是分布式系统的基石。
  4. 动手改造项目:尝试加入“支付服务”,形成三服务链路,挑战更复杂的场景。

写在最后

我当初学分布式事务时,也以为必须精通 2PC、Paxos 才能干活。后来发现,大多数业务用“本地事务 + 消息重试”就能搞定。先把简单方案用熟,再逐步进阶,才是高效学习之道。

希望这篇教程能帮你少走弯路。如果觉得有用,欢迎关注我的技术博客,我会持续更新“从零到上线”的实战系列!

🌟 记住:没有完美的方案,只有合适的方案。根据业务需求选择,比盲目追求“高大上”更重要。

Happy coding!

评论 0

最热最新
暂无评论
匿名用户Lv.1
0
影响力
0
文章
0
粉丝