Schema Registry 配置
Schema Registry 管理 Kafka Topic 消息 Schema,确保生产者和消费者之间的数据一致性和兼容性。AutoMQ TableTopic 利用 Schema Registry 解析 Topic 数据并同步 Iceberg 表结构,支持 Schema 演进。约束限制
Table Topic 当前仅支持以下 Schema Registry 实现。- Confluent Schema Registry :支持 Kafka 的 Schema 管理,提供 REST API。
- Aiven Karapace :开源 Schema Registry,兼容 Confluent Schema Registry 的 REST API。
Table Topic 当前不支持 Schema Registry 的鉴权配置(如 Basic Auth、SASL、mTLS)。需将 Schema Registry 配置为允许 AutoMQ 集群的匿名访问,并确保网络安全。
配置 Schema Registry
用户通过以下参数指定 Schema Registry 服务地址(集群级配置)。 配置参数 :automq.table.topic.schema.registry.url
:Schema Registry 的 URL(如http://schema-registry.example.com:8081
)。AutoMQ 使用此 URL 获取和同步 Schema。
- 在使用
automq.table.topic.schema.type=schema
时必须进行配置。
Catalog 配置
在 Apache Iceberg 中,Catalog 在管理表元数据方面扮演着至关重要的角色。其主要职责包括:-
追踪每个 Iceberg 表的当前元数据指针 。该指针指示了最新元数据文件(例如
vN.metadata.json
)的位置。 - 为更新此元数据指针提供原子操作 。这对于在提交过程中(例如,写入新数据或演进 Schema 时)确保数据一致性至关重要。
- 将表组织到命名空间中 ,并提供列出、创建、删除和重命名表的方法。
配置 Catalog
Table Topic 利用外部 Iceberg Catalog 来管理其 “table topics” 的元数据。-
通用配置前缀 :所有与 AutoMQ Table Topic 相关的 Iceberg Catalog 设置都使用前缀
automq.table.topic.catalog.*
。 -
主要配置键 :指定 Iceberg Catalog 类型,由
automq.table.topic.catalog.type
属性定义。
rest
、 glue
、 tablebucket
、 nessie
、 hive
。
支持的 Catalog 类型及配置
1. REST Catalog
此方式使用标准的 Iceberg REST Catalog 服务。-
类型设置 :
automq.table.topic.catalog.type=rest
-
配置属性 :
-
automq.table.topic.catalog.uri
:REST Catalog 服务的 URI(如http://rest:8181
)。 -
automq.table.topic.catalog.warehouse
:定义 Iceberg 数据仓库的 S3 路径。若未设置,则会使用 DataBucket 的 iceberg目录。
-
-
所需权限 :
- 凭证(若需认证)允许列出、创建、读取和更新表元数据。
- AutoMQ 在 S3 仓库路径的读/写/删除权限。
2. AWS Glue Data Catalog
利用 AWS Glue 作为 Iceberg Catalog。-
类型设置 :
automq.table.topic.catalog.type=glue
-
配置属性 :
automq.table.topic.catalog.warehouse
:定义 Iceberg 数据仓库的 S3 路径。
- 所需权限 :
- AutoMQ 对 AWS Glue 的数据库和表管理权限。
- AutoMQ 在 S3 仓库路径的读/写/删除权限。
3. TableBucket (S3Table)
-
类型设置 :
automq.table.topic.catalog.type=tablebucket
-
配置属性 :
automq.table.topic.catalog.warehouse
:指定 TableBucket ARN (例如:arn:aws:s3tables:::bucket-name
)。
-
所需权限 :
- AutoMQ 对 S3Table 的读写权限。
4. Nessie Catalog
使用 Project Nessie,这是一个具有类 Git 语义的事务性数据湖 Catalog。-
类型设置 :
automq.table.topic.catalog.type=nessie
-
配置属性 :
-
automq.table.topic.catalog.uri
:指定 Nessie 服务器 URI (例如:http://nessie-server:19120/api/v2
)。 -
automq.table.topic.catalog.warehouse
:定义 Iceberg 数据仓库的 S3 路径。若未设置,则会使用 DataBucket 的 iceberg目录。
-
5. Hive Metastore Catalog
使用现有的 Hive Metastore 作为 Iceberg Catalog。配置 Hive Metastore 作为 Catalog 时,可以查看文档在 Hive 中启用 Iceberg。-
类型设置 :
automq.table.topic.catalog.type=hive
-
配置属性 :
-
automq.table.topic.catalog.uri
:指定 Hive Metastore URI (例如:thrift://hostname:9083
)。 -
automq.table.topic.catalog.warehouse
:定义 Iceberg 数据仓库的 HDFS 或 S3 路径。若未设置,则会使用 DataBucket 的 iceberg目录。
-
-
认证配置 :
-
简单认证 :
automq.table.topic.catalog.auth=simple://?username=xxx
.
- Kerberos 认证 :
-
clientPrincipal
:Base64 编码的客户端 Kerberos principal。 -
keytabFile
:Base64 编码的客户端 keytab 文件内容。 -
krb5confFile
:Base64 编码的krb5.conf
文件内容。 -
automq.table.topic.hadoop.metastore.kerberos.principal
:Hive Metastore 服务器的 Kerberos principal (例如:hive/_HOST@REALM
)。
-
简单认证 :
-
所需权限 :
- Hive Metastore 的创建、更改、删除表权限。
- Kerberos 认证需有效 principal 和 keytab。
- AutoMQ 在仓库路径(HDFS 或 S3)的读/写/删除权限。
TableTopic 配置
以下配置均为 Topic 级别配置,通过此配置项,可进行 TableTopic 的功能启用设置,iceberg 提交间隔的调整,以及分区,upsert,分区等特性的配置。启用 TableTopic 功能
AutoMQ TableTopic 功能可将指定 Kafka Topic 的数据转换为 Apache Iceberg 表以支持结构化查询和分析。 配置参数 :-
automq.table.topic.enable
: 是否启用 TableTopic。设为 `true` 后,将创建 Iceberg 表存储数据。 -
automq.table.topic.namespace
: 指定 Iceberg 表在 Catalog 下的命名空间,需符合 Catalog 的命名规范(例如,字母、数字、下划线组合),在 Topic 创建时配置,不可后续修改。 -
automq.table.topic.schema.type
:指定 schema 模式,在 Topic 创建时配置,不可后续修改。默认 schemaless,支持以下值:-
schemaless
:不解析消息内容,仅基于 Kafka 消息自带 schema 写入,仅写入 record 的timestamp
,key
,value
字段,不依赖 Schema Registry。 -
schema
:依赖集群配置中集成外部Schema Registry, 并且Producer在向该 Topic 生成消息时需要将 schema 注册到 Schema Registry。
-
配置提交间隔
为支持实时分析,可调整 Iceberg 表提交频率以提升数据新鲜度。 配置参数 :automq.table.topic.commit.interval.ms
:数据提交间隔(毫秒)。默认 300000(5 分钟),最大 900000(15 分钟)。较短间隔提升实时性,但增加处理开销。
- 高频率提交可能导致 Commit 冲突和 MetadataFile 膨胀,增加存储和查询成本。
- 当前AutoMQ会自动删除1个小时前的Snapshot,避免随着时间的推移,metadata.json 文件变得过度臃肿。
- 需定期维护表,通过 Compaction 合并小文件,以优化 ManifestFile 体积和查询性能。
优化查询性能:分区配置
为提升 Iceberg 表查询性能(尤其在选择性过滤场景),可配置分区规则。 配置参数 :automq.table.topic.partition.by
:定义分区规则,支持基于字段或函数分区。例如,[bucket(name, 3), month(timestamp)]
表示按name
哈希分桶(3 个桶)并按timestamp
的月份分区。
-
桶分区:
bucket(field, N)
,按字段哈希值分区。 -
截断分区:
truncate(field, N)
,按字段截断值分区。 -
时间分区:
year(timestamp)
、month(timestamp)
、day(timestamp)
、hour(timestamp)
。
- 分区过多可能增加元数据管理开销,导致查询计划生成延迟和存储成本上升。建议根据数据量和查询模式合理设置分区数,并定期通过 Compaction 优化小文件。
支持 Upsert 和 CDC 模式
为支持更新(Upsert)或变更数据捕获(CDC)操作,可启用以下功能以实现动态数据管理。需配置主键以支持行级操作。 配置参数 :-
automq.table.topic.id.columns
:指定主键列(支持复合主键),如[region, name]
。 -
automq.table.topic.upsert.enable
:是否启用 Upsert 模式。设为true
后,系统根据主键插入或更新记录。 -
automq.table.topic.cdc.field
:指定 CDC 操作类型字段,值包括I
(插入)、U
(更新)、D
(删除)。
-
需配置
automq.table.topic.id.columns
以启用 Upsert 或 CDC 模式,否则仅支持追加写入。 -
Iceberg V2 表支持行级操作:
- 创建逻辑 :插入或更新记录存储在 datafile 中,删除记录通过 deletefile 标记(包含主键和删除标记)。Upsert 操作可能生成新的 datafile,CDC 的删除操作生成 deletefile。
- 查询逻辑 :使用 Merge-on-Read(MOR)机制,查询时合并 datafile 和 deletefile,通过 equality delete 过滤已标记删除的记录。
- 定期通过 Compaction 合并 datafile 和 deletefile,以优化查询性能。
- 启用 Upsert 模式:
- 启用 CDC 模式