Skip to main content

如何做到 Apache Kafka 100% 协议兼容

引言

开源界有很多适配了 Kafka 协议的产品,例如 RedpandaKafka on Pulsar。它们有的是从 0 开始重新构建 Kafka,有的是基于已有的产品来进行协议的嫁接。目前 Kafka 协议有 113 个 ErrorCode、68 个 API,仅是 Fetch API 就有 15 个版本,去实现 Kafka 协议和语义的 100% 兼容是极其困难的。并且后期随着 Apache Kafka 的发展,如何持续保持和 Kafka 协议的兼容也是一大挑战。

Kafka 的协议和语义的兼容性是用户选择 Kafka 产品的一个重要考量,因此 AutoMQ for Kafka(简称 AutoMQ Kafka) 架构设计的前提就是必须 100% 兼容 Apache Kafka 的协议和语义,并且能持续的跟进和对齐 Apache Kafka。

100% API 兼容

Apache Kafka 已经经过 10 余年的发展,由 1000+ Contributors 共同贡献了 1019 个 KIP,整个代码库截止目前(2024-02-23 06392f7ae2)已有 885,981 行代码,沉淀了大量的功能特性、优化和修复。如果要从零开始构建一个 API 协议和语义兼容的 Kafka 不仅开发工作量大,并且极易出错。Apache Kafka 架构由计算层和存储层构成:

  • 计算层:代码总量的 98%,承载了 Kafka 的 API 协议和功能特性,主要的开销为处理消息收发的 CPU 资源消耗。由于 Apache Kafka 良好的消息攒批和 API 请求攒批,2 核就能支撑 1GB/s 的流量,已经将 CPU 消耗降低到了极致;

  • 存储层:代码总量的 1.97%,仅有 17,532 行代码,负责消息的高可靠存储。Apache Kafka 作为流处理管道会长期存储大量数据,Apache Kafka 集群成本的大部分是由数据存储成本和存算一体部署的机器成本组成;

AutoMQ Kafka 的目标是将 Kafka 的成本降低 10x,那么主要优化点集中在存储层的云原生化。因此 AutoMQ Kafka 的核心思路是通过存算分离的架构来对 Apache Kafka 进行云原生重构:

  • 既可以复用 98% Apache Kafka 计算层代码,保障 API 的协议 & 语义兼容和功能对齐;

  • 又可以将存储层替换为云原生的存储服务,实现 Kafka 的 Serverless 化和 10x 降本;

Apache Kafka

在介绍 100% API 兼容方案之前,先来回顾一下 Apache Kafka 的模块层次。Apache Kafka 从南北向流量的处理层依次为:

  • 网络层:负责管理连接、从客户端接受网络包解码成请求和把响应编码成网络包发送给客户端;

  • 分发层:接收到请求后,KafkaApis 根据请求的 ApiKey 将请求分发到具体的业务逻辑处理模块;

  • 业务逻辑层:拆分成更细的子模块处理不同的业务逻辑。ReplicaManager 负责消息收发和分区管理;Coordinator 负责消费者管理和事务消息;Kraft 负责集群元数据;

  • 存储层:负责数据的高可靠存储,对业务逻辑层提供 Partition 抽象。自顶向下划分多个层次:UnifiedLog 负责 Log 的 ISR 多副本复制保障数据的高可靠;LocalLog 负责本地数据存储,提供一个“无限”的流存储抽象;LogSegment 作为 Kafka 的最小存储单元,将 LocalLog 切分成数据段映射到对应的物理文件;

以 Apache Kafka 处理消息 Produce 为例:

  1. 网络层将网络包解析成 ProduceRequest;

  2. 然后 KafkaApis 根据 ApiKey.PRODUCE 将请求分发给 ReplicaManager 处理;

  3. ReplicaManager#appendRecords 找到对应的 Partition;

  4. Partition#appendRecordsToLeader 最终调用到 LocalLog,LocalLog 将消息写到 Active Segment 中;

  5. LogSegment 将消息持久化到数据文件中,并构建对应的 index、timeindex 和 txnindex 索引;

其他业务逻辑层模块,例如事务 Coordinator、消费组 Coordinator、Kraft 元数据基本上也都是围绕着 Partition(Log)构建起来的。

AutoMQ Kafka

前面提到 AutoMQ Kafka 是存算分离的架构。在存储层 AutoMQ Kafka 抽象出 S3Stream 流存储库来替代 Apache Kafka 的 Log 本地存储。存储层对上暴露相同的 Partition 抽象,上层 Kraft 元数据管理、 Coordinator、ReplicaManager、KafkaApis 等模块均可以复用原有的代码逻辑。存储层上层复用原有的逻辑,使得 AutoMQ Kafka 不仅可以轻松做到 100% 的协议和语义兼容,还可以持续跟进 Apache Kafka 的最新功能和缺陷修复。

S3Stream

S3Stream 基于云盘和对象存储构建了低延迟、高吞吐、低成本的 Stream 抽象。在 API 层面核心的两个方法 append 和 fetch 分别提供 stream 的写入和读取。相比 Kafka 的 Log 会更加纯粹,没有事务索引、时间戳索引和 Compact 等功能,S3Stream 更加聚焦于流的存储,不关心上层的业务逻辑。


interface Stream {
CompletableFuture<AppendResult> append(AppendContext context, RecordBatch recordBatch);

CompletableFuture<FetchResult> fetch(FetchContext context, long startOffset, long endOffset, int maxBytesHint);

CompletableFuture<Void> trim(long newStartOffset);

// others
}

interface RecordBatch {
// records count, it's usually equal to Kafka RecordBatch.count
// and it also could be used as offset padding for compacted topic
int count();

ByteBuffer rawPayload();
}

既然 S3Stream 的能力和 Kafka 的 Log 没有对齐,那么 AutoMQ Kafka 是如何做到存储层替换的呢?这就和 AutoMQ Kafka 巧妙的存储切面有关了。

存储切面

在介绍 AutoMQ Kafka 的存储切面之前。先来简要探索一下 Apache Kafka Compact Topic 的 Compact 逻辑:

  1. LogCleaner 定期的将 Compact Topic 的分区进行 Compact;

  2. 首先会将分区的非活跃 Segment 进行分组;

  3. 将每组中有效的数据扫描出来,并写入到一个新的 Segment.cleaned 中;

  4. 最后使用新的 Segment 替换掉老的 Segment,完成 Compact;

可以发现 Kafka 虽然对业务逻辑层是以 Partition 暴露的连续的流抽象,但是内部实现 Compact 逻辑的时候是以 Segment 为最小存储维度来操作的。同样 Kafka 的 Log 恢复、事务索引、时间戳索引和读取都是基于 Segment 来操作的。因此 AutoMQ Kafka 的存储切面核心也是 Segment 维度的,只需要实现 Segment 的语义就能复用上层 LocalLog、LogCleaner 和 Partition 的逻辑,进一步保障存储层逻辑和 Apache Kafka 一致。

在 Apache Kafka 中,一个分区下面有两类数据:

  • Partition 维度包含 Producer 幂等信息快照数据 xxx.snapshot、leader epoch 信息 leader-epoch-checkpoint 等等元数据;

  • Segment 里面包含数据文件 xxx.data、稀疏索引文件 xxx.index、事务索引文件 xxx.tnxindex 和时间索引文件 xxx.timeindex;

AutoMQ Kafka 为了将 Kafka 变成无状态,会将这些文件的数据都下沉到 S3Stream:

  • Meta:Meta S3Stream 提供一个类 KV 的语义,存储 Partition 维度的元数据。Apache Kafka 可以通过文件系统目录树扫描出 Partition 下的 Segment 列表,在 AutoMQ Kafka 中会在 Meta S3Stream 通过 ElasticLogMeta 记录 Segment 列表和 Segment 与 Stream 的映射关系;

  • Data:S3Stream API 已经提供根据逻辑位点查询数据的能力,因此 xxx.data 和 xxx.index 可以一起被 Data S3Stream 替换;

  • Txn/Time:等价替换原有的 xxx.tnxindex 和 xxx.timeindex;

Segment 是有界的数据段,会随着大小和时间进行滚动,如果 Segment 下的每个文件都映射成一个 Stream,那么 Stream 的会膨胀得很快。因此 AutoMQ Kafka 将 Stream 逻辑切分成 Slice 映射到 Segment 的文件,一个 Partition 的固定开销限制在 3~7 个 Stream,最终的映射表达方式类似于:


{
"streamMap": {
"log": 100, // stream id
"time": 101,
...
}
"segments": [
{
"baseOffset": 0, // segment base offset
"streamSuffix": "", // if the suffix is .cleaned, means the segment is created from compaction, and the under data stream key is log.cleaned
"log": { "start": 0, "end": 2 }, // stream slice
"time": { "start": 0, "end": 12 },
...
},
{
"baseOffset": 2, // segment base offset
"streamSuffix": "",
"log": { "start": 2, "end": 5 },
...
},
{
"baseOffset": 5, // segment base offset
"streamSuffix": "",
"log": { "start": 5 "end": -1 }, // end = -1 represent it's the active segment
...
},
]
}

以上面的映射关系为例:分区下有三个 Segments:Segment-0、Segment-2 和 Segment-5。

  • Segment-0 的持有 [0, 2) 段的消息,数据映射到 Stream 也是 [0, 2)。读取分区 [0, 2) 的数据,会映射成读取 DataStream#fetch(0, 2);

  • 其中 Segment-5 为活跃的 Segment,分区写入新的数据的 baseOffset 分配为 8;

在前面提到的 Compact Topic 场景中,假设 Segment-0 和 Segment-2 会被 Compact 成 Segment-0.cleaned。Segment 的 baseOffset 为 0,映射到 Data(log.cleaned) Stream 中的 [0, 5) 段。为了保证可以在 Stream 中连续寻址,其中 Kafka 中 offset = 1 的消息会映射到为 RecordBatch{offset=0, count=2},count=2 是为了填补 offset = 0 被 compacted 后的空洞。同理 Kafka 中 offset = 2 的消息被映射成 RecordBatch{offset=2, count=1}、offset = 4 的消息被映射成 RecordBatch{offset=3, count=2}。

Segment0.cleaned 替换 Segment-0/2 和原有的 Segment-5 组成新的 Segments 列表,最终生成的 ElasticLogMeta 为:


{
"streamMap": {
"log": 100, // stream id
"time": 101,
"log.cleaned": 102,
...
}
"segments": [
{
"baseOffset": 0, // segment base offset
"streamSuffix": ".cleaned", // if the suffix is .cleaned, means the segment is created from compaction, and the under data stream key is log.cleaned
"log": { "start": 0, "end": 5 }, // stream slice
...
},
{
"baseOffset": 5, // segment base offset
"streamSuffix": "",
"log": { "start": 5, "end": -1 }, // end = -1 represent it's the active segment
...
},
]
}

通过这种映射的方式,轻量的完成了 Kafka 中本地存储文件到 S3Stream 的转换,除 Segment 外复用大部分存储层的逻辑,实现存储层的语义兼容。

质量保障

AutoMQ Kafka 除了在架构设计层面从理论上做到 100% 的 API 兼容以外,AutoMQ Kafka 还全部通过 Apache Kafka 的 387 个系统测试用例集(Kraft 模式)。该用例集覆盖了 Kafka 功能(消息收发、消费者管理、Topic Compaction 等等)、客户端兼容性(>= 0.9 )、运维(分区迁移、滚动重启等等)、Stream 和 Connector 等各个方面的测试,从实际运行层面确保了 AutoMQ Kafka 的 100% 协议和语义兼容。

未来规划

得益于 AutoMQ Kafka 只修改了存储切面,合并跟进 Apache Kafka 代码成本很低:

  • AutoMQ Kafka 计划在 24 年 4 月份合并 Apache Kafka 的代码,将内核从 3.4.0 升级到 3.6.0;

  • 后续 AutoMQ Kafka 规划采用 T + 1 Month 的模式合并 Apache Kafka 代码,持续跟进 Apache Kafka 的新特性和稳定性修复;