为什么Kafka能达到百万级TPS?为什么Kafka使用分区而不是队列?为什么Kafka适合大数据场景?

本文深度拆解Kafka的核心设计,揭示高性能的秘密。


一、Kafka核心架构

1.1 核心概念

Producer → Broker (Partition 0, 1, 2...) → Consumer Group
               ↓
            磁盘存储(Segment文件)

核心组件

  1. Producer(生产者):发送消息到Topic的Partition
  2. Broker(代理服务器):Kafka集群的节点,存储消息
  3. Topic(主题):消息的逻辑分类
  4. Partition(分区):Topic的物理分片,提升并行度
  5. Consumer Group(消费者组):多个Consumer组成的组,负载均衡消费
  6. Offset(偏移量):消息在Partition中的位置

1.2 为什么Kafka使用分区?

对比设计

// RabbitMQ模型:Queue(单队列)
Queue: order.queue
    ├─ Message 1
    ├─ Message 2
    ├─ Message 3
    ...

限制
- 单队列吞吐量受限单机磁盘IO
- 无法水平扩展

// Kafka模型:Topic + Partition(分区)
Topic: order
    ├─ Partition 0  Broker 1
       ├─ Message 1
       ├─ Message 4
       ...
    ├─ Partition 1  Broker 2
       ├─ Message 2
       ├─ Message 5
       ...
    └─ Partition 2  Broker 3
        ├─ Message 3
        ├─ Message 6
        ...

优势
- 并行处理多个Partition可以并行读写
- 水平扩展增加Partition可以提升吞吐量
- 负载均衡Partition分布在不同Broker

核心洞察

分区是Kafka高性能的关键设计

  • 并行度:N个分区 = N倍吞吐量
  • 可扩展性:增加分区可以线性提升性能

1.3 Partition分区机制

/**
 * Kafka Producer示例:发送消息到分区
 */
public class KafkaProducerExample {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {

            // 方式1:不指定Key(轮询分配到分区)
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>(
                        "order-topic",  // Topic
                        "订单-" + i     // Value
                );

                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        System.err.println("发送失败: " + exception.getMessage());
                    } else {
                        System.out.println("发送成功 → Partition: " + metadata.partition()
                                + ", Offset: " + metadata.offset());
                    }
                });
            }

            // 方式2:指定Key(根据Key的Hash值分配到分区)
            for (int i = 0; i < 10; i++) {
                String key = "user-" + (i % 3);  // 3个用户
                ProducerRecord<String, String> record = new ProducerRecord<>(
                        "order-topic",  // Topic
                        key,            // Key(决定分区)
                        "订单-" + i     // Value
                );

                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        System.err.println("发送失败: " + exception.getMessage());
                    } else {
                        System.out.println("Key: " + key + " → Partition: " + metadata.partition()
                                + ", Offset: " + metadata.offset());
                    }
                });
            }

            // 方式3:自定义分区器
            // props.put("partitioner.class", "com.example.CustomPartitioner");
        }
    }
}

分区策略

1. 未指定Key:轮询分配(Round-Robin)
   Message 1 → Partition 0
   Message 2 → Partition 1
   Message 3 → Partition 2
   Message 4 → Partition 0
   ...

2. 指定Key:Hash(Key) % Partition数量
   Key = "user-1" → Hash = 100 → Partition 0
   Key = "user-2" → Hash = 201 → Partition 1
   Key = "user-1" → Hash = 100 → Partition 0(同一Key进入同一分区)

优势:同一Key的消息保证顺序性

二、高性能的秘密

2.1 秘密1:顺序写磁盘

问题:为什么Kafka使用磁盘还能达到百万级TPS?

答案顺序写磁盘 > 随机写内存

性能对比:
- 随机写磁盘:100 KB/s
- 顺序写磁盘:600 MB/s(提升6000倍)
- 随机写内存:10 MB/s
- 顺序写内存:1 GB/s

结论:顺序写磁盘(600 MB/s)> 随机写内存(10 MB/s)

Kafka的顺序写实现

// Kafka将消息追加到日志文件末尾(顺序写)
public class SegmentAppend {

    private FileChannel fileChannel;
    private long offset;

    /**
     * 追加消息到日志文件(顺序写)
     */
    public void append(byte[] message) throws IOException {
        ByteBuffer buffer = ByteBuffer.wrap(message);

        // 顺序写到文件末尾
        fileChannel.write(buffer, offset);

        // 更新offset
        offset += message.length;
    }
}

2.2 秘密2:零拷贝技术(Zero Copy)

传统方式:4次拷贝 + 4次上下文切换

1. 磁盘 → 内核缓冲区(DMA拷贝)
2. 内核缓冲区 → 用户缓冲区(CPU拷贝)
3. 用户缓冲区 → Socket缓冲区(CPU拷贝)
4. Socket缓冲区 → 网卡(DMA拷贝)

总共:2次DMA拷贝 + 2次CPU拷贝

零拷贝方式:2次拷贝 + 2次上下文切换

1. 磁盘 → 内核缓冲区(DMA拷贝)
2. 内核缓冲区 → 网卡(DMA拷贝)

总共:2次DMA拷贝(省略了2次CPU拷贝)

Java实现(sendfile系统调用)

/**
 * 零拷贝示例
 */
public class ZeroCopyExample {

    /**
     * 传统方式:4次拷贝
     */
    public void traditionalCopy(File file, SocketChannel socketChannel) throws IOException {
        try (FileInputStream fis = new FileInputStream(file)) {
            byte[] buffer = new byte[4096];
            int bytesRead;

            while ((bytesRead = fis.read(buffer)) != -1) {
                // 从磁盘读取到用户空间(2次拷贝)
                socketChannel.write(ByteBuffer.wrap(buffer, 0, bytesRead));
                // 从用户空间写入到网卡(2次拷贝)
            }
        }
    }

    /**
     * 零拷贝方式:2次拷贝(使用transferTo)
     */
    public void zeroCopy(File file, SocketChannel socketChannel) throws IOException {
        try (FileChannel fileChannel = new FileInputStream(file).getChannel()) {
            // 直接从磁盘发送到网卡(2次拷贝)
            fileChannel.transferTo(0, fileChannel.size(), socketChannel);
        }
    }
}

性能提升

传统方式:
- 4次拷贝
- 4次上下文切换
- CPU参与拷贝

零拷贝方式:
- 2次拷贝(减少50%)
- 2次上下文切换(减少50%)
- CPU不参与拷贝(CPU利用率提升)

结论:性能提升2-3倍

2.3 秘密3:批量发送(Batch)

问题:为什么批量发送能提升性能?

答案减少网络IO次数

单条发送:
每条消息1次网络IO:
├─ Message 1 → 网络IO(1ms)
├─ Message 2 → 网络IO(1ms)
├─ Message 3 → 网络IO(1ms)
...
总耗时:1000条 × 1ms = 1000ms

批量发送:
100条消息1次网络IO:
├─ Message 1-100 → 网络IO(10ms)
├─ Message 101-200 → 网络IO(10ms)
...
总耗时:10批 × 10ms = 100ms(提升10倍)

Kafka Producer批量配置

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");

// 批量发送配置
props.put("batch.size", 16384);  // 批次大小:16KB
props.put("linger.ms", 10);      // 等待时间:10ms
props.put("buffer.memory", 33554432);  // 缓冲区:32MB

// 原理:
// Producer会等待10ms或积累16KB消息后批量发送

2.4 秘密4:压缩

// 压缩配置
props.put("compression.type", "lz4");  // 压缩算法:lz4(性能最好)

// 压缩率:
// - 原始大小:1MB
// - 压缩后:200KB(压缩率80%)
// - 网络传输:200KB(提升5倍)

三、ISR机制与副本同步

3.1 副本机制

Topic: order-topic(3个副本)
    Partition 0
        ├─ Leader(Broker 1)← Producer写入、Consumer读取
        ├─ Follower(Broker 2)← 从Leader同步
        └─ Follower(Broker 3)← 从Leader同步

ISR(In-Sync Replicas):与Leader保持同步的副本集合

ISR = {Broker 1(Leader),Broker 2,Broker 3}

如果Broker 3同步延迟过大:
ISR = {Broker 1(Leader),Broker 2}

3.2 消息确认机制(acks)

// acks配置
props.put("acks", "all");  // 等待所有ISR副本确认

// acks取值:
// - acks = 0:不等待确认(最快,可能丢失)
// - acks = 1:等待Leader确认(中等,Leader故障可能丢失)
// - acks = all:等待所有ISR副本确认(最慢,不会丢失)

对比

acks性能可靠性说明
0最高最低不等待确认,可能丢失
1中等中等Leader确认,Leader故障可能丢失
all最低最高ISR全部确认,不会丢失

四、Consumer Group与分区分配

4.1 Consumer Group机制

Topic: order-topic(3个分区)
    ├─ Partition 0
    ├─ Partition 1
    └─ Partition 2

Consumer Group 1(3个Consumer)
    ├─ Consumer 1 → Partition 0
    ├─ Consumer 2 → Partition 1
    └─ Consumer 3 → Partition 2

Consumer Group 2(2个Consumer)
    ├─ Consumer 4 → Partition 0 + Partition 1
    └─ Consumer 5 → Partition 2

规则

  1. 每个Partition只能被Consumer Group中的1个Consumer消费
  2. 1个Consumer可以消费多个Partition
  3. Consumer数量 > Partition数量:有Consumer会空闲

4.2 分区分配策略

/**
 * Kafka Consumer示例
 */
public class KafkaConsumerExample {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "order-consumer-group");  // Consumer Group
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 分区分配策略
        props.put("partition.assignment.strategy",
                "org.apache.kafka.clients.consumer.RangeAssignor");
        // - RangeAssignor:按范围分配(默认)
        // - RoundRobinAssignor:轮询分配
        // - StickyAssignor:粘性分配(尽量保持之前的分配)

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList("order-topic"));

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Partition: %d, Offset: %d, Key: %s, Value: %s%n",
                            record.partition(), record.offset(), record.key(), record.value());

                    // 处理消息
                    processMessage(record.value());
                }

                // 手动提交offset
                consumer.commitSync();
            }
        }
    }

    private static void processMessage(String message) {
        // 业务逻辑
    }
}

五、Kafka vs RabbitMQ

特性KafkaRabbitMQ
设计目标高吞吐日志系统通用消息队列
吞吐量百万级TPS万级TPS
延迟毫秒级微秒级
消息模型发布订阅点对点 + 发布订阅
消息顺序分区内有序队列内有序
消息重试不支持(需自己实现)支持(死信队列)
消息回溯支持(Offset)不支持
持久化磁盘(日志)内存 + 磁盘
消息过滤不支持支持(Routing Key)
适用场景日志收集、大数据、流式计算业务消息、任务队列

六、总结

Kafka高性能的核心设计:

  1. 分区机制:并行处理,线性扩展
  2. 顺序写磁盘:600 MB/s的顺序写性能
  3. 零拷贝技术:减少50%的拷贝次数
  4. 批量发送:减少网络IO次数
  5. 消息压缩:减少网络传输量

下一篇将深入RocketMQ的事务消息与实战应用。


作者:如月说客 创建时间:2025-11-03 系列文章:消息中间件第一性原理(4/6)