Table Topic 和腾讯云 Iceberg 集成
AutoMQ Table Topic 支持和 Iceberg 集成实现流式数据入湖分析和查询,免除 ETL 的配置和运维。本文介绍如何在腾讯云环境配置 Table Topic 和 Iceberg 的集成。
前置条件
腾讯云环境使用 AutoMQ Table Topic 功能,需要满足如下条件:
-
版本约束: 要求 AutoMQ 实例版本 >= 1.4.1。
-
实例约束: 必须在创建 AutoMQ 实例时开启 Table Topic 特性,后续才可使用 Table Topic。实例一旦创建完成,后续无法再开启 Table Topic 功能。
-
资源要求: 在腾讯云上使用 Table Topic 必须提供可用的 Hive Catalog 服务。用户可以购买腾讯云托管的 EMR HMS 服务。
操作步骤
步骤 1:配置 Hive Catalog 服务
腾讯云上配置 Hive Catalog 可以自行搭建 Hive Metastore 服务,也可以购买腾讯云 EMR 托管的 HMS 服务。本文档以腾讯云托管的 EMR HMS 服务 为例,介绍配置方法。
- 前往腾讯云 EMR 产品,购买集群,并开启 Kerberos 身份认证。

- 配置 HMS 环境变量设置脚本 hive-env.sh ,添加访问对象存储的 Jar。
# 添加 jar 依赖,让 hms 支持访问对象存储
# hive-env.sh
HIVE_AUX_JARS_PATH=/opt/apps/HADOOP-COMMON/hadoop-current/share/hadoop/tools/lib/aws-java-sdk-bundle-1.11.375.jar:/opt/apps/HADOOP-COMMON/hadoop-current/share/hadoop/tools/lib/hadoop-aws-3.2.1.jar

- 变更 hive-site.xml,添加以下对象存储访问相关配置。
# 添加对象存储访问配置
# hive-site.xml
fs.s3a.endpoint=https://cos.ap-shanghai.myqcloud.com
fs.s3a.access.key=$access_key
fs.s3a.secret.key=$secret_key
fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem

-
参数配置完成后,重启 Hive Metastore 服务,使得变更生效。
-
(若未开启 Keberos 身份认证,则可跳过此步骤)添加 Keberos 用户,并导出身份认证相关配置。登录到 EMR Master 机器,并以 root 身份运行以下命令,生成身份鉴权相关配置文件。
$ kadmin.local
addprinc -pw ${input your password} ${input your Principal}
ktadd -k /root/test.keytab ${input your Principal}
exit
cp /etc/krb5.conf ~/krb5.conf
# 修改 ~/krab5.conf 在 libdefaults 下添加 allow_weak_crypto = true (因为腾讯云部署的 kerberos 用的不安全的 des3-cbc-sha1 算法)
vim ~/krab5.conf
# 在下面的集成创建中将会使用到 krb5.conf 与 test.keytab 文件
-
(若未开启 Keberos 身份认证,则可跳过此步骤)进入 Hive 服务的配置管理页面,从
hive-site.xml
文件中获取如下配置项的值:-
hive.metastore.uris
,作为后续步骤中的 User Principal。 -
hive.metastore.kerberos.principal
,作为后续步骤中的 Kerberos Principal。
-
步骤 2:创建 Hive Catalog 集成
Hive Catalog 服务创建完成后,需要前往 AutoMQ 控制台创建 Hive Catalog 集成,用于录入步骤 1 中配置的 Catalog 信息。操作说明如下:
- 登录 AutoMQ 控制台,点击集成 菜单。

-
选择创建 Hive Catalog 集成 ,填写如下信息:
-
名称: 填写有区分度的集成配置名称。
-
部署配置: 选择集成归属的部署配置,需要后续创建实例保持一致。
-
Hive Metastore 接入点: 配置 Hive Metastore 接入点,后续 AutoMQ 会访问该接入点查询 Data Catalog。
-
鉴权类型: 根据实际需求,选择 不鉴权 或者 Kerberos 鉴权。
-
KeyTab 文件: 如果开启 Kerberos 鉴权,需要上传步骤 1 生成的 KeyTab 文件。
-
Krb5.conf 文件: 如果开启 Kerberos 鉴权,需要上传步骤 1 生成的 Krb5.conf 文件。
-
Kerberos User Principal: 如果开启 Kerberos 鉴权,需要填写步骤 1 获取的 User Principal。
-
Kerberos Principal: 如果开启 Kerberos 鉴权,需要填写步骤 1 获取的 Kerberos Principal。
-
Warehouse :填写数据湖使用的对象存储 Bucket,该 Bucket 用于长期存储数据。
-

-
填写 Warehouse 参 数后,AutoMQ 会生成访问该 Bucket 所需的 CAM Policy,并展示 AutoMQ 实例当前使用的 IAM Role。请前往云厂商 IAM 控制台,参考该 Policy 创建授权。
-
创建授权完成后,即可点击创建 Hive Catalog 集成。
步骤 3:创建 AutoMQ 实例,开启 Table Topic 功能
使用 AutoMQ Table Topic 功能需要在创建实例时提前开启,后续才可实现数据流式入湖。因此创建实例时需要参考下方说明配置:
注意:
AutoMQ 实例开启 Table Topic 后,并非所有 Topic 都会默认开启流转表,仍然需要按 Topic 粒度按需配置实现流式数据入湖。
如需使用 Table Topic,必须在创建实例时提前开启 Table Topic。实例一旦创建,即无法更改此配置。

步骤 4:创建 Topic,并配置流转表
AutoMQ 实例开启 Table Topic 功能后,即可在创建 Topic 时按需配置流转表。具体操作如下:
-
进入步骤 3 的实例,Topic 列表,点击创建 Topic 。
-
创建 Topic 的配置中,开启 Table Topic 转换,并配置如下参数:
-
命名空间: 命名空间用于隔离不同的 Iceberg 表,对应 Data Catalog 中的 Database。建议根据业务归属设置相应的参数值。
-
Schema 约束类型: 设置 Topic 消息是否遵守 Schema 约束。如果选择** 'Schema' ** 则启用 Schema 约束,需要向 AutoMQ 内置 SchemaRegistry 注册消息 Schema,后续发送消息时严格按照 Schema,后续 Table Topic 会使用该 Schema 的字段填充 Iceberg 表;如果选择** 'Schemaless'** ,则代表消息内容无明确 Schema 约束,此时会将消息 Key 和 Value 作为整体字段填充 Iceberg 表。
-

- 点击确定 ,创建支持流转表的 Topic。
步骤 5: 生产消息,通过 Spark-sql 查询 Iceberg 表数据
完成 AutoMQ 实例配置以及 Table Topic 创建后,即可测试生产数据,并在 Iceberg 表查询数据。
- 点击进入 Topic 详情,生产消息 Tab,输入测试的消息 Key 和消息 Value,发送消息。

-
新增Spark组件,然后登陆到 EMR 集群的 Master 节点,切换到 hadoop 用户。
-
执行如下命令,进入 spark-sql。
export HIVE_URI="thrift://YOUR_HOST:7004"
export WAREHOUSE="s3a://YOUR_BUCKER/iceberg"
export AWS_ACCESS_KEY=
export AWS_SECRET_KEY=
export AWS_REGION=
export AWS_S3_ENDPOINT='https://cos.$AWS_REGION.myqcloud.com'
spark-sql --master local[*] \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.hive=org.apache.iceberg.spark.SparkCatalog \
--conf spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.1,software.amazon.awssdk:bundle:2.31.14,org.apache.hadoop:hadoop-aws:3.3.1,software.amazon.awssdk:url-connection-client:2.17.178 \
--conf spark.sql.defaultCatalog=hive \
--conf spark.sql.catalog.hive.type=hive \
--conf spark.sql.catalog.hive.uri=$HIVE_URI \
--conf spark.sql.catalog.hive.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.hive.s3.endpoint=$AWS_S3_ENDPOINT \
--conf spark.sql.catalog.hive.warehouse=$WAREHOUSE \
--conf spark.sql.catalog.hive.client.region=$AWS_REGION \
--conf spark.sql.catalog.hive.s3.path-style-access=false \
--conf spark.sql.catalog.hive.s3.access-key-id=$AWS_ACCESS_KEY \
--conf spark.sql.catalog.hive.s3.secret-access-key=$AWS_SECRET_KEY \
--conf spark.hadoop.fs.s3a.path.style.access=false \
--conf spark.hadoop.fs.s3a.region=$AWS_REGION
- 执行SQL, 进行查询。可以看到 AutoMQ 实时将 Kafka 消息转换成对应数据表的数据记录。用户也可以使用其他查询引擎进行分析和计算。
