为什么Kafka能达到百万级TPS?为什么Kafka使用分区而不是队列?为什么Kafka适合大数据场景?
本文深度拆解Kafka的核心设计,揭示高性能的秘密。
一、Kafka核心架构
1.1 核心概念
Producer → Broker (Partition 0, 1, 2...) → Consumer Group
↓
磁盘存储(Segment文件)
核心组件:
- Producer(生产者):发送消息到Topic的Partition
- Broker(代理服务器):Kafka集群的节点,存储消息
- Topic(主题):消息的逻辑分类
- Partition(分区):Topic的物理分片,提升并行度
- Consumer Group(消费者组):多个Consumer组成的组,负载均衡消费
- Offset(偏移量):消息在Partition中的位置
1.2 为什么Kafka使用分区?
对比设计:
// RabbitMQ模型:Queue(单队列)
Queue: order.queue
├─ Message 1
├─ Message 2
├─ Message 3
...
限制:
- 单队列吞吐量受限(单机磁盘IO)
- 无法水平扩展
// Kafka模型:Topic + Partition(分区)
Topic: order
├─ Partition 0 → Broker 1
│ ├─ Message 1
│ ├─ Message 4
│ ...
├─ Partition 1 → Broker 2
│ ├─ Message 2
│ ├─ Message 5
│ ...
└─ Partition 2 → Broker 3
├─ Message 3
├─ Message 6
...
优势:
- 并行处理:多个Partition可以并行读写
- 水平扩展:增加Partition可以提升吞吐量
- 负载均衡:Partition分布在不同Broker
核心洞察:
分区是Kafka高性能的关键设计
- 并行度:N个分区 = N倍吞吐量
- 可扩展性:增加分区可以线性提升性能
1.3 Partition分区机制
/**
* Kafka Producer示例:发送消息到分区
*/
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
// 方式1:不指定Key(轮询分配到分区)
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"order-topic", // Topic
"订单-" + i // Value
);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("发送失败: " + exception.getMessage());
} else {
System.out.println("发送成功 → Partition: " + metadata.partition()
+ ", Offset: " + metadata.offset());
}
});
}
// 方式2:指定Key(根据Key的Hash值分配到分区)
for (int i = 0; i < 10; i++) {
String key = "user-" + (i % 3); // 3个用户
ProducerRecord<String, String> record = new ProducerRecord<>(
"order-topic", // Topic
key, // Key(决定分区)
"订单-" + i // Value
);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("发送失败: " + exception.getMessage());
} else {
System.out.println("Key: " + key + " → Partition: " + metadata.partition()
+ ", Offset: " + metadata.offset());
}
});
}
// 方式3:自定义分区器
// props.put("partitioner.class", "com.example.CustomPartitioner");
}
}
}
分区策略:
1. 未指定Key:轮询分配(Round-Robin)
Message 1 → Partition 0
Message 2 → Partition 1
Message 3 → Partition 2
Message 4 → Partition 0
...
2. 指定Key:Hash(Key) % Partition数量
Key = "user-1" → Hash = 100 → Partition 0
Key = "user-2" → Hash = 201 → Partition 1
Key = "user-1" → Hash = 100 → Partition 0(同一Key进入同一分区)
优势:同一Key的消息保证顺序性
二、高性能的秘密
2.1 秘密1:顺序写磁盘
问题:为什么Kafka使用磁盘还能达到百万级TPS?
答案:顺序写磁盘 > 随机写内存
性能对比:
- 随机写磁盘:100 KB/s
- 顺序写磁盘:600 MB/s(提升6000倍)
- 随机写内存:10 MB/s
- 顺序写内存:1 GB/s
结论:顺序写磁盘(600 MB/s)> 随机写内存(10 MB/s)
Kafka的顺序写实现:
// Kafka将消息追加到日志文件末尾(顺序写)
public class SegmentAppend {
private FileChannel fileChannel;
private long offset;
/**
* 追加消息到日志文件(顺序写)
*/
public void append(byte[] message) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(message);
// 顺序写到文件末尾
fileChannel.write(buffer, offset);
// 更新offset
offset += message.length;
}
}
2.2 秘密2:零拷贝技术(Zero Copy)
传统方式:4次拷贝 + 4次上下文切换
1. 磁盘 → 内核缓冲区(DMA拷贝)
2. 内核缓冲区 → 用户缓冲区(CPU拷贝)
3. 用户缓冲区 → Socket缓冲区(CPU拷贝)
4. Socket缓冲区 → 网卡(DMA拷贝)
总共:2次DMA拷贝 + 2次CPU拷贝
零拷贝方式:2次拷贝 + 2次上下文切换
1. 磁盘 → 内核缓冲区(DMA拷贝)
2. 内核缓冲区 → 网卡(DMA拷贝)
总共:2次DMA拷贝(省略了2次CPU拷贝)
Java实现(sendfile系统调用):
/**
* 零拷贝示例
*/
public class ZeroCopyExample {
/**
* 传统方式:4次拷贝
*/
public void traditionalCopy(File file, SocketChannel socketChannel) throws IOException {
try (FileInputStream fis = new FileInputStream(file)) {
byte[] buffer = new byte[4096];
int bytesRead;
while ((bytesRead = fis.read(buffer)) != -1) {
// 从磁盘读取到用户空间(2次拷贝)
socketChannel.write(ByteBuffer.wrap(buffer, 0, bytesRead));
// 从用户空间写入到网卡(2次拷贝)
}
}
}
/**
* 零拷贝方式:2次拷贝(使用transferTo)
*/
public void zeroCopy(File file, SocketChannel socketChannel) throws IOException {
try (FileChannel fileChannel = new FileInputStream(file).getChannel()) {
// 直接从磁盘发送到网卡(2次拷贝)
fileChannel.transferTo(0, fileChannel.size(), socketChannel);
}
}
}
性能提升:
传统方式:
- 4次拷贝
- 4次上下文切换
- CPU参与拷贝
零拷贝方式:
- 2次拷贝(减少50%)
- 2次上下文切换(减少50%)
- CPU不参与拷贝(CPU利用率提升)
结论:性能提升2-3倍
2.3 秘密3:批量发送(Batch)
问题:为什么批量发送能提升性能?
答案:减少网络IO次数
单条发送:
每条消息1次网络IO:
├─ Message 1 → 网络IO(1ms)
├─ Message 2 → 网络IO(1ms)
├─ Message 3 → 网络IO(1ms)
...
总耗时:1000条 × 1ms = 1000ms
批量发送:
100条消息1次网络IO:
├─ Message 1-100 → 网络IO(10ms)
├─ Message 101-200 → 网络IO(10ms)
...
总耗时:10批 × 10ms = 100ms(提升10倍)
Kafka Producer批量配置:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// 批量发送配置
props.put("batch.size", 16384); // 批次大小:16KB
props.put("linger.ms", 10); // 等待时间:10ms
props.put("buffer.memory", 33554432); // 缓冲区:32MB
// 原理:
// Producer会等待10ms或积累16KB消息后批量发送
2.4 秘密4:压缩
// 压缩配置
props.put("compression.type", "lz4"); // 压缩算法:lz4(性能最好)
// 压缩率:
// - 原始大小:1MB
// - 压缩后:200KB(压缩率80%)
// - 网络传输:200KB(提升5倍)
三、ISR机制与副本同步
3.1 副本机制
Topic: order-topic(3个副本)
Partition 0
├─ Leader(Broker 1)← Producer写入、Consumer读取
├─ Follower(Broker 2)← 从Leader同步
└─ Follower(Broker 3)← 从Leader同步
ISR(In-Sync Replicas):与Leader保持同步的副本集合
ISR = {Broker 1(Leader),Broker 2,Broker 3}
如果Broker 3同步延迟过大:
ISR = {Broker 1(Leader),Broker 2}
3.2 消息确认机制(acks)
// acks配置
props.put("acks", "all"); // 等待所有ISR副本确认
// acks取值:
// - acks = 0:不等待确认(最快,可能丢失)
// - acks = 1:等待Leader确认(中等,Leader故障可能丢失)
// - acks = all:等待所有ISR副本确认(最慢,不会丢失)
对比:
| acks | 性能 | 可靠性 | 说明 |
|---|---|---|---|
| 0 | 最高 | 最低 | 不等待确认,可能丢失 |
| 1 | 中等 | 中等 | Leader确认,Leader故障可能丢失 |
| all | 最低 | 最高 | ISR全部确认,不会丢失 |
四、Consumer Group与分区分配
4.1 Consumer Group机制
Topic: order-topic(3个分区)
├─ Partition 0
├─ Partition 1
└─ Partition 2
Consumer Group 1(3个Consumer)
├─ Consumer 1 → Partition 0
├─ Consumer 2 → Partition 1
└─ Consumer 3 → Partition 2
Consumer Group 2(2个Consumer)
├─ Consumer 4 → Partition 0 + Partition 1
└─ Consumer 5 → Partition 2
规则:
- 每个Partition只能被Consumer Group中的1个Consumer消费
- 1个Consumer可以消费多个Partition
- Consumer数量 > Partition数量:有Consumer会空闲
4.2 分区分配策略
/**
* Kafka Consumer示例
*/
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-consumer-group"); // Consumer Group
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 分区分配策略
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.RangeAssignor");
// - RangeAssignor:按范围分配(默认)
// - RoundRobinAssignor:轮询分配
// - StickyAssignor:粘性分配(尽量保持之前的分配)
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("order-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.partition(), record.offset(), record.key(), record.value());
// 处理消息
processMessage(record.value());
}
// 手动提交offset
consumer.commitSync();
}
}
}
private static void processMessage(String message) {
// 业务逻辑
}
}
五、Kafka vs RabbitMQ
| 特性 | Kafka | RabbitMQ |
|---|---|---|
| 设计目标 | 高吞吐日志系统 | 通用消息队列 |
| 吞吐量 | 百万级TPS | 万级TPS |
| 延迟 | 毫秒级 | 微秒级 |
| 消息模型 | 发布订阅 | 点对点 + 发布订阅 |
| 消息顺序 | 分区内有序 | 队列内有序 |
| 消息重试 | 不支持(需自己实现) | 支持(死信队列) |
| 消息回溯 | 支持(Offset) | 不支持 |
| 持久化 | 磁盘(日志) | 内存 + 磁盘 |
| 消息过滤 | 不支持 | 支持(Routing Key) |
| 适用场景 | 日志收集、大数据、流式计算 | 业务消息、任务队列 |
六、总结
Kafka高性能的核心设计:
- 分区机制:并行处理,线性扩展
- 顺序写磁盘:600 MB/s的顺序写性能
- 零拷贝技术:减少50%的拷贝次数
- 批量发送:减少网络IO次数
- 消息压缩:减少网络传输量
下一篇将深入RocketMQ的事务消息与实战应用。
作者:如月说客 创建时间:2025-11-03 系列文章:消息中间件第一性原理(4/6)