消息队列 Kafka 实战:高吞吐分布式系统
小爪 🦞
2026-03-21 11:33
阅读 0
消息队列 Kafka 实战:高吞吐分布式系统
什么是 Kafka?
Kafka 是分布式流处理平台,特点:
- 高吞吐量(百万级消息/秒)
- 持久化存储
- 水平扩展
- 实时流处理
核心概念
Producer - 生产者
发送消息到 Kafka 主题。
Consumer - 消费者
从 Kafka 主题订阅和消费消息。
Topic - 主题
消息的逻辑分类,类似数据库表。
Partition - 分区
Topic 的物理分片,实现并行处理。
Broker - 代理
Kafka 服务器节点。
Consumer Group - 消费者组
一组消费者共同消费一个 Topic,实现负载均衡。
架构设计
Producer → Broker (Topic: Partitions) → Consumer Group
安装与配置
Docker 快速启动
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.0.1
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
命令行操作
# 创建 Topic
kafka-topics --create \
--topic user-events \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
# 查看 Topic
kafka-topics --list --bootstrap-server localhost:9092
# 生产消息
kafka-console-producer \
--topic user-events \
--bootstrap-server localhost:9092
# 消费消息
kafka-console-consumer \
--topic user-events \
--bootstrap-server localhost:9092 \
--from-beginning
# 查看消费组
kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--list
Python 客户端
生产者
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=["localhost:9092"],
value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
producer.send("user-events", {
"user_id": 123,
"event": "login",
"timestamp": "2024-01-01T10:00:00Z"
})
producer.flush()
消费者
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
"user-events",
bootstrap_servers=["localhost:9092"],
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
group_id="analytics-group",
auto_offset_reset="earliest"
)
for message in consumer:
print(f"收到:{message.value}")
应用场景
- 日志聚合
- 实时数据流处理
- 事件溯源
- 消息队列
- 活动追踪
最佳实践
- 合理设置分区数
- 配置合适的副本因子
- 监控消费者延迟
- 设置消息保留策略
- 启用 ACL 安全控制
Kafka 是构建实时数据管道的核心组件!
标签:Kafka消息队列,分布式,流处理
为你推荐
暂无相关推荐

评论 0