Table Topic 配置
本文档介绍 AutoMQ Table Topic 功能的详细配置,包括 Schema Registry、Catalog 以及 AutoMQ 服务端的配置。
Schema Registry 配置
Schema Registry 管理 Kafka Topic 消息 Schema,确保生产者和消费者之间的数据一致性和兼容性。AutoMQ TableTopic 利用 Schema Registry 解析 Topic 数据并同步 Iceberg 表结构,支持 Schema 演进。
约束限制
Table Topic 当前仅支持以下 Schema Registry 实现。
-
Confluent Schema Registry :支持 Kafka 的 Schema 管理,提供 REST API。
-
Aiven Karapace :开源 Schema Registry,兼容 Confluent Schema Registry 的 REST API。
Table Topic 当前不支持 Schema Registry 的鉴权配置(如 Basic Auth、SASL、mTLS)。需将 Schema Registry 配置为 允许 AutoMQ 集群的匿名访问,并确保网络安全。
配置 Schema Registry
用户通过以下参数指定 Schema Registry 服务地址(集群级配置)。
配置参数 :
automq.table.topic.schema.registry.url
:Schema Registry 的 URL(如http://schema-registry.example.com:8081
)。AutoMQ 使用此 URL 获取和同步 Schema。
注意事项 :
- 在使用
automq.table.topic.schema.type=schema
时必须进行配置。
示例 :
automq.table.topic.schema.registry.url=https://schema-registry.example.com:8081
Catalog 配置
在 Apache Iceberg 中,Catalog 在管理表元数据方面扮演着至关重要的角色。其主要职责包括:
-
追踪每个 Iceberg 表的当前元数据指针 。该指针指示了最新元数据文件(例如
vN.metadata.json
)的位置。 -
为更新此元数据指针提供原子操作 。这对于在提交过程中(例如,写入新数据或演进 Schema 时)确保数据一致性至关重要。
-
将表组织到命名空间中 ,并提供列出、创建、删除和重命名表的方法。
从本质上讲,Catalog 是任何 Iceberg 操作的入口点,它告诉查询引擎在哪里可以找到关于表的 Schema、分区、快照和数据文件的权威信息。
配置 Catalog
Table Topic 利用外部 Iceberg Catalog 来管理其 "table topics" 的元数据。
-
通用配置前缀 :所有与 AutoMQ Table Topic 相关的 Iceberg Catalog 设置都使用前缀
automq.table.topic.catalog.*
。 -
主要配置键 :指定 Iceberg Catalog 类型,由
automq.table.topic.catalog.type
属性定义。
AutoMQ Table Topic 支持以下 Catalog 类型: rest
、 glue
、 tablebucket
、 nessie
、 hive
。
支持的 Catalog 类型及配置
1. REST Catalog
此方式使用标准的 Iceberg REST Catalog 服务。
-
类型设置 :
automq.table.topic.catalog.type=rest
-
配置属性 :
-
automq.table.topic.catalog.uri
:REST Catalog 服务的 URI(如http://rest:8181
)。 -
automq.table.topic.catalog.warehouse
:定义 Iceberg 数据仓库的 S3 路径。若未设置,则会使用 DataBucket 的 iceberg目录。
-
-
所需权限 :
-
凭证(若需认证)允许列出、创建、读取和更新表元数据。
-
AutoMQ 在 S3 仓库路径的读/写/删除权限。
-
示例 :
automq.table.topic.catalog.type=rest
automq.table.topic.catalog.uri=http://rest:8181
automq.table.topic.catalog.warehouse=s3://automq-bucket/wh/
2. AWS Glue Data Catalog
利用 AWS Glue 作为 Iceberg Catalog。
-
类型设置 :
automq.table.topic.catalog.type=glue
-
配置属性 :
automq.table.topic.catalog.warehouse
:定义 Iceberg 数据仓库的 S3 路径。
-
所需权限 :
-
AutoMQ 对 AWS Glue 的数据库和表管理权限。
-
AutoMQ 在 S3 仓库路径的读/写/删除权限。
示例:
automq.table.topic.catalog.type=glue
automq.table.topic.catalog.warehouse=s3://automq-bucket/glue/
3. TableBucket (S3Table)
-
类型设置 :
automq.table.topic.catalog.type=tablebucket
-
配置属性 :
automq.table.topic.catalog.warehouse
:指定 TableBucket ARN (例如:arn:aws:s3tables:::bucket-name
)。
-
所需权限 :
- AutoMQ 对 S3Table 的读写权限。
示例 :
automq.table.topic.catalog.type=tablebucket
automq.table.topic.catalog.warehouse=arn:aws:s3tables:us-east-1:xxxxxx:bucket/xxxxx
4. Nessie Catalog
使用 Project Nessie,这是一个具有类 Git 语义的事务性数据湖 Catalog。
-
类型设置 :
automq.table.topic.catalog.type=nessie
-
配置属性 :
-
automq.table.topic.catalog.uri
:指定 Nessie 服务器 URI (例如:http://nessie-server:19120/api/v2
)。 -
automq.table.topic.catalog.warehouse
:定义 Iceberg 数据仓库的 S3 路径。若未设置,则会使用 DataBucket 的 iceberg目录。
-
示例 :
automq.table.topic.catalog.type=nessie
automq.table.topic.catalog.uri=http://nessie-server:19120/api/v2
automq.table.topic.catalog.warehouse=s3://automq-bucket/nessie/
5. Hive Metastore Catalog
使用现有的 Hive Metastore 作为 Iceberg Catalog。配置 Hive Metastore 作为 Catalog 时,可以查看文档在 Hive 中启用 Iceberg。
-
类型设置 :
automq.table.topic.catalog.type=hive
-
配置属性 :
-
automq.table.topic.catalog.uri
:指定 Hive Metastore URI (例如:thrift://hostname:9083
)。 -
automq.table.topic.catalog.warehouse
:定义 Iceberg 数据仓库的 HDFS 或 S3 路径。若未设置,则会使用 DataBucket 的 iceberg目录。
-
-
认证配置 :
-
简单认证 :
automq.table.topic.catalog.auth=simple://?username=xxx
.
-
Kerberos 认证 :
automq.table.topic.catalog.auth=kerberos://?principal=base64(clientPrincipal)&keytab=base64(keytabFile)&krb5conf=base64(krb5confFile)-
clientPrincipal
:Base64 编码的客户端 Kerberos principal。 -
keytabFile
:Base64 编码的客户端 keytab 文件内容。 -
krb5confFile
:Base64 编码的krb5.conf
文件内容。 -
automq.table.topic.hadoop.metastore.kerberos.principal
:Hive Metastore 服务器的 Kerberos principal (例如:hive/_HOST@REALM
)。
-
-
所需权限 :
-
Hive Metastore 的创建、更改、删除表权限。
-
Kerberos 认证需有效 principal 和 keytab。
-
AutoMQ 在仓库路径(HDFS 或 S3)的读/写/删除权限。
-
示例 :
automq.table.topic.catalog.type=hive
automq.table.topic.catalog.uri=thrift://hostname:9083
automq.table.topic.catalog.warehouse=s3://automq-bucket/hive/
automq.table.topic.catalog.auth=simple://?username=user&password=pass
TableTopic 配置
以下配置均为 Topic 级别配置,通过此配置项,可进行 TableTopic 的功能启用设置,iceberg 提交间隔的调整,以及分区,upsert,分区等特性的配置。
启用 TableTopic 功能
AutoMQ TableTopic 功能可将指定 Kafka Topic 的数据转换为 Apache Iceberg 表以支持结构化查询和分析。
配置参数 :
-
automq.table.topic.enable
: 是否启用 TableTopic。设为 `true` 后,将创建 Iceberg 表存储数据。 -
automq.table.topic.namespace
: 指定 Iceberg 表在 Catalog 下的命名空间,需符合 Catalog 的命名规范(例如,字母、数字、下划线组合),在 Topic 创建时配置,不可后续修改。 -
automq.table.topic.schema.type
:指定 schema 模式,在 Topic 创建时配置,不可后续修改。默认 schemaless,支持以下值:-
schemaless
:不解析消息内容,仅基于 Kafka 消息自带 schema 写入,仅写入 record 的timestamp
,key
,value
字段,不依赖 Schema Registry。 -
schema
:依赖集 群配置中集成外部Schema Registry, 并且Producer在向该 Topic 生成消息时需要将 schema 注册到 Schema Registry。
-
示例:
automq.table.topic.enable=true
automq.table.topic.namespace=default
automq.table.topic.schema.type=schema
配置提交间隔
为支持实时分析,可调整 Iceberg 表提交频率以提升数据新鲜度。
配置参数 :
automq.table.topic.commit.interval.ms
:数据提交间隔(毫秒)。默认 300000(5 分钟),最大 900000(15 分钟)。较短间隔提升实时性,但增加处理开销。
注意事项 :
-
高频率提交可能导致 Commit 冲突和 MetadataFile 膨胀,增加存储和查询成本。
-
当前AutoMQ会自动删除1个小时前的Snapshot,避免随着时间的推移,metadata.json 文件变得过度臃肿。
-
需定期维护表,通过 Compaction 合并小文件,以优化 ManifestFile 体积和查询性能。
示例 :
automq.table.topic.commit.interval.ms=60000
优化查询性能:分区配置
为提升 Iceberg 表查询性能(尤其在选择性过滤场景),可配置分区规则。
配置参数 :
automq.table.topic.partition.by
:定义分区规则,支持基于字段或函数分区。例如,[bucket(name, 3), month(timestamp)]
表示按name
哈希分桶(3 个桶)并按timestamp
的月份分区。
支持的分区策略 :
-
桶分区:
bucket(field, N)
,按字段哈希值分区。 -
截断分区:
truncate(field, N)
,按字段截断值分区。 -
时间分区:
year(timestamp)
、month(timestamp)
、day(timestamp)
、hour(timestamp)
。
详情可参考 Iceberg 分区文档。
注意事项 :
- 分区过多可能增加元数据管理开销,导致查询计划生成延迟和存储成本上升。建议根据数据量和查询模式合理设置分区数,并定期通过 Compaction 优化小文件。
示例 :
automq.table.topic.partition.by=[bucket(name, 3), month(timestamp)]
支持 Upsert 和 CDC 模式
为支持更新(Upsert)或变更数据捕获(CDC)操作,可启用以下功能以实现动态数据管理。需配置主键以支持行级操作。
配置参数 :
-
automq.table.topic.id.columns
:指定主键列(支持复合主键),如[region, name]
。 -
automq.table.topic.upsert.enable
:是否启用 Upsert 模式。设为true
后,系统根据主键插入或更新记录。 -
automq.table.topic.cdc.field
:指定 CDC 操作类型字段,值包括I
(插入)、U
(更新)、D
(删除)。
注意事项 :
-
需配置
automq.table.topic.id.columns
以启用 Upsert 或 CDC 模式,否则仅支持追加写入。 -
Iceberg V2 表支持行级操作:
-
创建逻辑 :插入或更新记录存储在 datafile 中,删除记录通过 deletefile 标记(包含主键和删除标记)。Upsert 操作可能生成新的 datafile,CDC 的删除操作生成 deletefile。
-
查询逻辑 :使用 Merge-on-Read(MOR)机制,查询时合并 datafile 和 deletefile,通过 equality delete 过滤已标记删除的记录。
-
-
定期通过 Compaction 合并 datafile 和 deletefile,以优化查询性能。
示例 :
- 启用 Upsert 模式:
automq.table.topic.upsert.enable=true
automq.table.topic.id.columns=[id, name]
- 启用 CDC 模式
automq.table.topic.cdc.field=op_type
automq.table.topic.id.columns=[id, name]