自动化脚本的一些思考:从“能跑”到“跑得好”的成长之路

长安码客
2025-06-18 21:04
阅读 519

作为一名技术团队负责人,我经常在项目初期听到一句话:“这段流程我们先写个自动化脚本来处理。”这句话听起来简单,背后却隐藏着不少门道。自动化脚本的本质,是将重复性高、规则明确的任务交给程序来完成,从而释放人力、提升效率。但在实际执行过程中,如何写出一个既能跑、又能长期稳定运行的脚本,远比想象中要复杂得多

这篇文章我想结合自己过去几年的真实工作经验,尤其是几个关键项目的经历,谈谈我在设计和维护自动化脚本过程中遇到的问题、踩过的坑,以及逐步形成的一些思考。

为什么需要重新审视自动化脚本?

为什么需要重新审视自动化脚本?

在我职业生涯早期,曾参与一个日志分析平台的开发项目。这个平台的核心目标是采集多个业务系统的日志数据,并将其聚合到一个统一的数据仓库中,供后续分析使用。

最开始我们选择用 Python 写了一组批处理脚本,分别负责拉取日志、格式转换、上传到 Kafka、入库等步骤。起初这套方案运转良好,脚本运行一次成功一次,看起来没有问题。

但很快我们就意识到:这些看似“能跑”的脚本其实非常脆弱。当源服务器偶尔出现网络波动时,脚本直接报错退出;当文件名发生变化后,原本匹配的正则表达式失效,导致部分数据丢失;更有甚者,某次误操作覆盖了上一轮已经处理过的历史日志,造成了数据重复……

这让我第一次认真思考一个问题:自动化脚本不是“跑起来”就完事了,真正考验我们的是“能不能稳定地跑下去”

我们遇到了哪些挑战?

我们遇到了哪些挑战?

场景一:定时任务中的失败重试机制缺失

在上面提到的日志采集系统中,最初我们采用简单的 crontab 定时触发方式执行脚本。但由于网络请求、远程服务器响应慢等问题,脚本很容易因为某个环节失败而中断整个流程。

比如,有一个脚本需要调用第三方 API 获取访问令牌:

def get_access_token():
    response = requests.get("https://api.example.com/auth")
    return response.json()['token']

一旦该接口不稳定或返回异常结构,就会抛出 KeyError,脚本直接终止,后续的流程无法继续。这种“不可靠依赖”如果不在脚本层面做处理,就极易造成整体失败。

场景二:状态管理缺失导致任务重复执行

另一个问题是状态管理。由于我们缺乏对每次任务的状态记录,每次只能通过“是否已存在输出结果”来判断是否执行过。但这种方式并不可靠,有时是因为中间步骤失败导致结果不全,有时是因为误删了临时文件。

这就导致一些数据被重复处理,甚至出现数据污染的情况。

场景三:环境配置混乱带来的可移植性差

更糟的一点是,我们很多脚本严重依赖本地环境变量或路径硬编码。例如:

LOG_DIR = "/var/log/app_logs"
CONFIG_PATH = "/opt/scripts/config.yaml"

这带来两个问题:

  1. 脚本在不同机器上运行时行为不一致;
  2. 升级或修改路径时需要手动改动代码,易出错。

这些问题加在一起,使得我们的“自动化”反而成了运维噩梦——频繁的人工干预、反复排查错误、数据修复成本陡增。

我们的解决方案:构建可靠的自动化脚本体系

我们的解决方案:构建可靠的自动化脚本体系

为了解决这些问题,我们在后续迭代中逐步建立了一套更加健壮的自动化脚本架构,主要围绕以下几个核心理念展开:

1. 分阶段处理 + 状态追踪

我们将整个任务拆分成若干个独立阶段,每个阶段完成后记录状态(如 Redis、数据库或本地元数据文件),这样可以做到:

  • 中断后从上次成功节点恢复;
  • 每一步都有进度可查;
  • 明确知道哪一部分失败。

以日志采集为例,拆分为以下阶段:

阶段 说明
Stage 0 检查远程服务器日志文件是否存在
Stage 1 下载日志文件
Stage 2 解析日志内容并格式化
Stage 3 发送到 Kafka

每完成一个阶段,更新状态标记,下次运行时跳过已完成的部分。

代码质量检测-2

2. 错误容忍与自动重试

对于外部依赖,我们加入 retry 机制和 fallback 处理。例如上面提到的获取 token 接口:

from tenacity import retry, stop_after_attempt, wait_fixed

@retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
def get_access_token():
    try:
        response = requests.get("https://api.example.com/auth", timeout=10)
        return response.json()['token']
    except (KeyError, requests.RequestException) as e:
        logging.error(f"Get token failed: {e}")
        raise

项目管理工具-1

这里用了 tenacity 这个库,实现最多尝试三次、间隔5秒的自动重试机制,显著提升了对外部服务抖动的容忍度。

3. 引入轻量级调度框架

为了更好地管理任务状态和执行流程,我们也开始引入像 PrefectAirflow 这样的轻量级调度工具。它们不仅提供了可视化的任务视图,还支持任务分发、依赖管理和通知机制。

举个例子,我们后来将整个日志采集流程迁移至 Prefect 后,代码变得清晰易读:

from prefect import task, Flow

@task
def download_log_files():
    ...

@task
def parse_logs(raw_data):
    ...

with Flow("Daily Log Processing") as flow:
    raw_log_files = download_log_files()
    parsed_data = parse_logs(raw_log_files)

flow.run()

而且可以通过 UI 查看每个任务的状态、耗时、日志等信息,极大方便了问题追踪。

4. 动态配置与环境隔离

我们不再将配置硬编码进脚本中,而是引入 .env 文件,并使用 dotenv 库加载配置:

import os
from dotenv import load_dotenv

load_dotenv()

LOG_DIR = os.getenv("LOG_DIR", "/default/log/path")
KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "default_topic")

同时鼓励使用 Docker 容器打包脚本及其依赖环境,确保“在哪都一样跑”。

开发实践:一个真实案例

开发实践:一个真实案例

接下来我想分享一个具体的项目场景,展示我们是如何一步步改进脚本的。

项目背景

公司有一套内部的审批系统,用户在提交申请后,需要人工逐层审批。随着组织规模扩大,审批任务越来越多,HR 团队希望能有个系统自动识别超期未审批的请求,并发送提醒邮件给相关责任人。

于是我们决定用 Python 编写一个自动化脚本,每天凌晨扫描所有待审批事项,识别超时项,然后通过企业邮箱发送提醒邮件。

初版脚本存在的问题

第一个版本非常简单粗暴,逻辑如下:

  1. 使用 requests 请求审批系统接口,获取所有待审批事项;
  2. 遍历每一项,判断是否超过设置时间(比如72小时);
  3. 构建提醒邮件并发送给上级审批人;
  4. 将处理过的 ID 记录在一个本地文件中,防止重复提醒。

很快我们就发现:

  • 某些审批人离职后,脚本仍试图向其发送邮件,造成垃圾邮件;
  • 如果当日运行失败,第二天会重复发送提醒;
  • 日志不够详细,出了问题根本不知道哪里挂了。

改进思路

我们随后做了几点重要调整:

✅ 增加数据清洗层

在拿到审批数据后,我们新增了一个清洗步骤,过滤掉无效人员(如已离职、权限变更等),这部分通过连接 HR 系统查询验证:

def is_valid_approver(approver_id):
    # 查询员工状态是否有效
    status = hr_system.get_employee_status(approver_id)
    return status == 'active'

✅ 使用 Redis 做去重缓存

我们将原本写在本地文本文件的“已发送ID”,换成 Redis 的 SET 结构,支持 TTL(有效期控制)和跨实例共享:

import redis

r = redis.Redis(host='redis-host')

def has_been_sent(request_id):
    return r.sismember('sent_reminders', request_id)

def mark_as_sent(request_id):
    r.sadd('sent_reminders', request_id)

✅ 增加日志输出和错误通知

我们增加了详细的日志记录,并在脚本末尾添加 Slack 通知:

import logging
import slackweb

logging.basicConfig(level=logging.INFO, filename='reminder.log', format='%(asctime)s - %(levelname)s - %(message)s')

...

def notify_slack(msg):
    slack = slackweb.Slack(url=os.getenv("SLACK_WEBHOOK_URL"))
    slack.notify(text=msg)

踩过的坑:别忽视细节的力量

在整个自动化脚本建设过程中,我们踩过不少坑,也积累了一些宝贵经验。

❌ 网络请求的默认 timeout 设置太长或没有设置

有一次线上环境部署后,一个脚本卡住了两个小时才报错。最终发现是因为某个 HTTP 请求没有设置 timeout,而目标服务器恰好处于不可达状态,导致整个脚本“卡死”。

教训:任何涉及 I/O 的操作必须设超时机制,并合理控制重试次数

response = requests.get(url, timeout=(3.05, 27))  # connect timeout and read timeout

❌ 忽略环境变量差异

我们在本地测试一切正常,放到服务器上却始终报错找不到配置文件。后来才发现是环境变量名称写错了,比如 KAFKA_BROKERKAFKA_BOOTSTRAP_SERVERS 不一致。

教训:配置命名要规范统一,最好通过 CI/CD 流程验证配置完整性

❌ 脚本并发执行导致冲突

我们曾经上线过一个自动部署脚本,用于在多台服务器上并行部署应用。但因多个实例同时操作共享目录,导致某些文件被覆盖、锁未释放等问题。

教训:若需并发,请务必加上分布式锁机制(如 Redis Lock、Zookeeper 等)

效果总结:自动化带来的收益不止于效率提升

经过这一系列改造,我们的自动化脚本逐渐走向成熟,带来了以下几个方面的明显收益:

🕒 效率提升明显

审批提醒脚本上线后,审批平均完成时间从原来的96小时缩短到48小时内。HR 团队反馈人工催办频率下降了 70%。

🔧 可维护性增强

借助状态跟踪和日志机制,我们能在第一时间发现问题,并快速定位影响范围。

💡 更强扩展能力

后来我们将这套脚本模式迁移到其他类似场景,包括订单超时提醒、工单自动派发、日志归档清理等等,大大降低了新需求的落地周期。

给读者的建议:让脚本“活下来”

如果你也在日常工作中大量使用自动化脚本,这里是我想给你的一些建议:

1. 不要怕重构,但要善用抽象

很多时候我们觉得“能跑就行”,直到出问题才追悔莫及。其实一开始就把结构设计得清晰一点,后面改起来更容易。

比如将脚本拆分成函数或模块、封装通用逻辑、避免全局变量等。

2. 拥抱调度平台,告别 crontab 黑暗时代

crontab 适合简单的周期任务,但对于现代业务场景来说远远不够。推荐你尝试像 Airflow、Prefect、Luigi 这样的调度框架,它们不仅可以编排任务,还能帮助你理解数据流和任务依赖关系。

3. 日志和通知,是故障排查的第一道防线

脚本再小,也请加上基本的日志输出。出问题的时候,你会感谢那个写了 logging.info() 的自己。

4. 测试不是可选项,是必须项

即使只是个小脚本,也要考虑 mock 数据测试、边界条件测试、错误模拟等场景。否则上线之后,就是一场灾难。

5. 把自动化当作产品来对待

当你把它当成一个“产品”而不是临时任务来维护时,你就自然会思考它的稳定性、可观测性、可扩展性,也会更愿意投入精力去做持续优化。


最后:从“程序员”到“工程师”的转变

写到这里,我想起一位前辈说的:“写脚本能解决问题,但写好脚本才能证明你是个真正的工程师。”

在经历了无数次失败与重构之后,我越发意识到:所谓技术成长,不是你能写出多么酷炫的算法,而是你能写出一个经得起时间考验、别人接手也能顺利运行的脚本

这或许也是我们从“程序员”成长为“工程师”的必经之路。

希望这篇文章能够对你有所启发。也欢迎留言交流你们在自动化脚本实践中遇到的问题和经验。

一起在“能跑”的基础上,写出“跑得好”的脚本吧!

评论 0

最热最新
暂无评论
匿名用户Lv.1
0
影响力
0
文章
0
粉丝