从零构建实时日志聚合系统:技术探索与实践之路

朱超_后端
2025-06-16 01:57
阅读 318

引言

引言

大家好,我是一名在一线互联网公司做过多年后端开发和架构的技术负责人。这些年里,参与过不少大型项目的建设、优化以及运维工作。今天想和大家分享一个让我印象深刻的项目经历:我们团队是如何从零搭建一套实时日志聚合系统的。

这个项目并不是一开始就明确目标的“大工程”,而是在日常工作中逐渐暴露出来的痛点催生出的需求。希望借着这段经历,能给大家带来一些启发,特别是对那些正在考虑或者已经开始做类似平台建设的团队有些帮助。


背景与问题描述

背景与问题描述

几年前,我所在的公司正处于快速扩张期,业务模块越来越多,微服务数量翻倍增长。当时的日志管理方式还是以“每台服务器保留一份日志文件 + 手动 SSH 登录查看”为主。

这种模式一开始还能应对,但随着系统的复杂性提升,问题接踵而至:

  • 日志分散:不同服务部署在不同机器上,日志散落在多个节点中。
  • 查询困难:线上定位问题时,需要登录多台服务器查日志,效率极低。
  • 分析不便:无法进行集中统计分析,比如错误码分布、请求响应时间趋势等。
  • 缺乏统一规范:不同服务的日志格式差异很大,严重影响后期处理。

当时我们有同事尝试使用ELK(Elasticsearch + Logstash + Kibana)来做日志收集展示,但由于初期配置不当,性能堪忧,最终也没能落地。

于是,我们决定自己动手重新设计一个日志聚合系统,既要解决现有问题,又能支撑未来的发展。


技术方案的选择与思考

技术方案的选择与思考

初步调研阶段

为了找到合适的解决方案,我们先进行了为期两周的调研,重点围绕以下问题:

  1. 日志采集:如何高效地把每个节点上的日志实时采集到中心系统?
  2. 日志传输:如何避免网络瓶颈和延迟?是否支持压缩、缓冲机制?
  3. 存储与查询:选择哪种搜索引擎或数据库?是否支持结构化查询?
  4. 扩展性与稳定性:能否适应未来服务的增长?高可用怎么保障?

我们考察了几种主流组合,包括:

技术栈 优点 缺点
ELK 成熟、社区活跃 部署复杂、资源消耗大、Logstash性能差
Loki + Promtail 轻量级、适合云原生 查询能力弱、缺少高级分析功能
Fluentd + Elasticsearch 灵活、插件丰富 配置复杂、学习成本高
自研轻量日志收集器 + ClickHouse 定制化强 开发投入大、风险高

开发工具界面-1

综合评估后,我们选择了 Fluentd + Kafka + Elasticsearch + Kibana 的混合架构:

  • Fluentd负责本地日志采集与格式标准化;
  • Kafka作为消息中间件实现解耦和削峰填谷;
  • Elasticsearch用于高性能搜索和聚合查询;
  • Kibana提供可视化界面。

这套方案兼顾了灵活性、可扩展性和运维成熟度。


系统架构设计与实现思路

整体架构如下图所示:

+---------------------+
|       App Server     |
| (输出标准格式日志)   |
+----------+----------+
           |
           v
+---------+----------+
|     Fluentd Agent    |
| (运行在每台机器)   |
+---------+----------+
           |
           v
+---------+----------+
|        Kafka         |
| (topic: logs)      |
+---------+----------+
           |
           v
+---------+----------+
|      Log Collector   |
| (消费Kafka日志)    |
+---------+----------+
           |
           v
+---------+----------+
|    Elasticsearch     |
| (建立索引 & 存储)  |
+---------+----------+
           |
           v
+---------+----------+
|       Kibana         |
| (日志查询 & 分析)   |
+----------------------+

整个流程可以概括为:

  1. 应用将日志按标准格式写入本地文件。
  2. 每个节点运行Fluentd Agent,定时读取日志文件并转换成JSON。
  3. 经过初步过滤和标签添加后,发送到Kafka的logs topic。
  4. 一组Log Collector订阅logs topic,并进一步处理数据(如字段增强、IP归属地解析等),然后写入Elasticsearch。
  5. 最后通过Kibana进行可视化展示和自由查询。

这样做的好处是:

  • 组件之间解耦:即使某个环节临时不可用,也能通过Kafka缓存数据。
  • 弹性伸缩性强:Collector可以通过增加消费者来横向扩容。
  • 故障隔离好:各层出现问题都不会导致整个系统瘫痪。

关键代码片段分享

下面是我们在实际实现中几个比较关键的部分。

1. Fluentd Agent 配置示例(td-agent.conf

<source>
  @type tail
  path /var/log/app/*.log
  pos_file /var/log/td-agent/app.log.pos
  tag app.logs
  format json
</source>

<filter app.logs>
  @type record_transformer
  <record>
    host "#{Socket.gethostname}"
    service "app"
  </record>
</filter>

<match app.logs>
  @type kafka_buffered
  brokers "kafka01:9092,kafka02:9092"
  topic logs
  flush_interval 5s
  buffer_type file
  buffer_path /var/log/td-agent/buffer/kafka
</match>

注:这里的Fluentd使用了kafka plugin,并且做了buffer持久化以防宕机丢失。


2. Log Collector Python 示例(消费 Kafka 写入 ES)

from confluent_kafka import Consumer, KafkaException
from elasticsearch import Elasticsearch
import json

es = Elasticsearch(["es01:9200", "es02:9200"])

conf = {
    'bootstrap.servers': 'kafka01:9092',
    'group.id': 'log_collector_group',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(conf)
consumer.subscribe(['logs'])

while True:
    msg = consumer.poll(timeout=1.0)
    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaException._PARTITION_EOF:
            continue
        else:
            print(msg.error())
            break


![开发工具界面-2](https://code-guide.oss.shanghai.autogptai.club/common/file/download?name=date2025061601/63e9e27b-7c70-41ce-ad2d-3c9d6f6fc89d.jpg)


    try:
        log_data = json.loads(msg.value().decode('utf-8'))
        
        # 增加地理位置信息
        ip = log_data.get('client_ip')
        geo_info = get_geo_from_ip(ip)  # 实际应调用第三方接口
        log_data.update(geo_info)

        es.index(index="logs-2024.08.15", body=log_data)
    except Exception as e:
        print(f"Error processing message: {e}")

注意事项:

  • 日志索引建议按天滚动,便于清理旧数据。
  • 使用批量写入ES会更高效,我们最终使用的是bulk API。
  • 实际中还加入了重试逻辑、熔断机制、异常监控等。

开发过程中踩过的坑

尽管前期准备充分,但在实际落地过程中,仍然遇到不少意料之外的问题。

🐾 坑一:Kafka分区不足导致堆积严重

起初我们只给logs这个topic分配了3个分区,结果上线不到一周就发现Consumer消费跟不上,导致大量消息积压。

原因:单个Consumer只能消费一个分区,在流量高峰期根本扛不住。

解决办法

  • 将分区数增加到60个;
  • 同时增加Consumer实例数,充分利用多核资源。

✅ 教训:预估日志吞吐量很重要,尤其是在分布式系统中。


🐾 坑二:Elasticsearch写入压力过大,节点频繁OOM

刚上线几天,经常出现某个ES节点内存爆掉的情况。经过排查,发现主要原因是:

  • 单个文档体积太大,有些日志包含完整请求体(Body),个别达到几MB。
  • 同一个Index下分片太多,默认设置了5个primary shard,后来变成累赘。

解决办法

  • 对日志内容做清洗,去掉冗余字段;
  • 修改索引策略,设置每天新建一个index(logs-YYYY.MM.DD);
  • 每个index的shard数量设为1,节省资源。

✅ 教训:合理规划索引分片和生命周期管理尤为重要。


🐾 坑三:Kibana权限混乱,误删历史数据

有一次某个实习生不小心删除了几个重要的Kibana index pattern,连带着历史数据也看不到了。

虽然底层数据还在,但恢复起来非常麻烦。

解决办法

  • 设置基于角色的权限控制(RBAC);
  • 定期导出Kibana对象配置(dashboards、visualizations、index patterns);
  • 使用版本控制工具跟踪变更。

成果与价值体现

系统上线半年后,整个日志体系发生了质的变化:

项目 上线前 上线后
日志查询平均耗时 >5分钟(人工查找) <1秒
错误追踪效率 多人协作 + 几小时定位 单人操作 + 实时图表
数据可视化能力 支持自定义仪表盘、报警规则
数据保存周期 最多7天 按业务分类,最长6个月
运维成本 高(需专人支持) 低(自动化巡检 + 告警)

更重要的是,这套日志系统成为了后续各种运维平台的基础组件之一,比如链路追踪、异常检测、业务运营分析等都从中受益。


经验与建议总结

结合这次经验,我想给同行们几点建议:

🧰 1. 技术选型不能唯“流行”,要结合业务特点

不要盲目追新,要问自己:“这套方案是否解决了我的核心问题?”、“维护成本能不能承受?”、“是否容易横向扩展?”等等。

🔍 2. 重视日志的标准化和规范化

日志格式越统一,后面的加工就越顺畅。最好一开始就制定清晰的Schema,包括哪些字段必填、命名规范、编码方式等。

📈 3. 提前做好容量规划

尤其对于大数据类系统,一定要预留足够的缓冲空间。提前预估日均日志量、最大峰值、数据保存周期,这决定了各个组件的资源配置。

🛡️ 4. 安全防护不能少

尤其是对外开放的查询接口,要严格限制访问权限,防止敏感信息泄露。日志中可能包含用户隐私、身份认证信息等,务必脱敏处理。

🧪 5. 做好测试与灰度上线

不要一上来就全部切换,应该先小范围试点,确保稳定后再逐步推广。同时记录关键指标变化,便于分析效果。


写在最后

回顾这次技术探索之旅,我深刻体会到:

“一个好的基础设施系统,不是靠堆叠先进技术实现的,而是从业务需求出发,一步步打磨出来的。”

技术本身从来都不是目的,它的价值体现在能真正解决实际问题,提高效率,释放人力。

如果你现在也在考虑搭建自己的日志分析系统,或者已经走在路上,欢迎留言交流,一起探讨更多细节。我们可以一起成长,少走弯路。

技术探索这条路没有终点,只有不断进化的过程。愿我们都保持好奇心,持续前行。


📦 文章配套资料:

评论 0

最热最新
暂无评论
匿名用户Lv.1
0
影响力
0
文章
0
粉丝