Skip to main content

Table Topic 和腾讯云 Iceberg 集成

AutoMQ Table Topic 支持和 Iceberg 集成实现流式数据入湖分析和查询,免除 ETL 的配置和运维。本文介绍如何在腾讯云环境配置 Table Topic 和 Iceberg 的集成。

前置条件

腾讯云环境使用 AutoMQ Table Topic 功能,需要满足如下条件:

  • 版本约束: 要求 AutoMQ 实例版本 >= 1.4.1。

  • 实例约束: 必须在创建 AutoMQ 实例时开启 Table Topic 特性,后续才可使用 Table Topic。实例一旦创建完成,后续无法再开启 Table Topic 功能。

  • 资源要求: 在腾讯云上使用 Table Topic 必须提供可用的 Hive Catalog 服务。用户可以购买腾讯云托管的 EMR HMS 服务。

操作步骤

步骤 1:配置 Hive Catalog 服务

腾讯云上配置 Hive Catalog 可以自行搭建 Hive Metastore 服务,也可以购买腾讯云 EMR 托管的 HMS 服务。本文档以腾讯云托管的 EMR HMS 服务 为例,介绍配置方法。

  1. 前往腾讯云 EMR 产品,购买集群,并开启 Kerberos 身份认证。
配置 HMS 环境变量设置脚本
  1. 配置 HMS 环境变量设置脚本 hive-env.sh ,添加访问对象存储的 Jar。

# 添加 jar 依赖,让 hms 支持访问对象存储
# hive-env.sh
HIVE_AUX_JARS_PATH=/opt/apps/HADOOP-COMMON/hadoop-current/share/hadoop/tools/lib/aws-java-sdk-bundle-1.11.375.jar:/opt/apps/HADOOP-COMMON/hadoop-current/share/hadoop/tools/lib/hadoop-aws-3.2.1.jar

变更 hive-site.xml,添加以下对象存储访问相关配置。
  1. 变更 hive-site.xml,添加以下对象存储访问相关配置。

# 添加对象存储访问配置
# hive-site.xml
fs.s3a.endpoint=https://cos.ap-shanghai.myqcloud.com

fs.s3a.access.key=$access_key

fs.s3a.secret.key=$secret_key

fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem

参数配置完成后,重启 Hive Metastore 服务,
  1. 参数配置完成后,重启 Hive Metastore 服务,使得变更生效。

  2. (若未开启 Keberos 身份认证,则可跳过此步骤)添加 Keberos 用户,并导出身份认证相关配置。登录到 EMR Master 机器,并以 root 身份运行以下命令,生成身份鉴权相关配置文件。


$ kadmin.local

addprinc -pw ${input your password} ${input your Principal}

ktadd -k /root/test.keytab ${input your Principal}
exit

cp /etc/krb5.conf ~/krb5.conf

# 修改 ~/krab5.conf 在 libdefaults 下添加 allow_weak_crypto = true (因为腾讯云部署的 kerberos 用的不安全的 des3-cbc-sha1 算法)
vim ~/krab5.conf

# 在下面的集成创建中将会使用到 krb5.conf 与 test.keytab 文件

  1. (若未开启 Keberos 身份认证,则可跳过此步骤)进入 Hive 服务的配置管理页面,从 hive-site.xml 文件中获取如下配置项的值:

    1. hive.metastore.uris ,作为后续步骤中的 User Principal。

    2. hive.metastore.kerberos.principal ,作为后续步骤中的 Kerberos Principal。

步骤 2:创建 Hive Catalog 集成

Hive Catalog 服务创建完成后,需要前往 AutoMQ 控制台创建 Hive Catalog 集成,用于录入步骤 1 中配置的 Catalog 信息。操作说明如下:

  1. 登录 AutoMQ 控制台,点击集成 菜单。
选择
  1. 选择创建 Hive Catalog 集成 ,填写如下信息:

    1. 名称: 填写有区分度的集成配置名称。

    2. 部署配置: 选择集成归属的部署配置,需要后续创建实例保持一致。

    3. Hive Metastore 接入点: 配置 Hive Metastore 接入点,后续 AutoMQ 会访问该接入点查询 Data Catalog。

    4. 鉴权类型: 根据实际需求,选择 不鉴权 或者 Kerberos 鉴权。

    5. KeyTab 文件: 如果开启 Kerberos 鉴权,需要上传步骤 1 生成的 KeyTab 文件。

    6. Krb5.conf 文件: 如果开启 Kerberos 鉴权,需要上传步骤 1 生成的 Krb5.conf 文件。

    7. Kerberos User Principal: 如果开启 Kerberos 鉴权,需要填写步骤 1 获取的 User Principal。

    8. Kerberos Principal: 如果开启 Kerberos 鉴权,需要填写步骤 1 获取的 Kerberos Principal。

    9. Warehouse :填写数据湖使用的对象存储 Bucket,该 Bucket 用于长期存储数据。

填写 Warehouse 参数后,AutoMQ 会生成访问该 Bucket 所需的 CAM Policy,并展示 AutoMQ 实例
  1. 填写 Warehouse 参数后,AutoMQ 会生成访问该 Bucket 所需的 CAM Policy,并展示 AutoMQ 实例当前使用的 IAM Role。请前往云厂商 IAM 控制台,参考该 Policy 创建授权。

  2. 创建授权完成后,即可点击创建 Hive Catalog 集成。

步骤 3:创建 AutoMQ 实例,开启 Table Topic 功能

使用 AutoMQ Table Topic 功能需要在创建实例时提前开启,后续才可实现数据流式入湖。因此创建实例时需要参考下方说明配置:

tip

注意:

AutoMQ 实例开启 Table Topic 后,并非所有 Topic 都会默认开启流转表,仍然需要按 Topic 粒度按需配置实现流式数据入湖。

如需使用 Table Topic,必须在创建实例时提前开启 Table Topic。实例一旦创建,即无法更改此配置。

步骤 4:创建 Topic,并配置流转表

AutoMQ 实例开启 Table Topic 功能后,即可在创建 Topic 时按需配置流转表。具体操作如下:

  1. 进入步骤 3 的实例,Topic 列表,点击创建 Topic

  2. 创建 Topic 的配置中,开启 Table Topic 转换,并配置如下参数:

    1. 命名空间: 命名空间用于隔离不同的 Iceberg 表,对应 Data Catalog 中的 Database。建议根据业务归属设置相应的参数值。

    2. Schema 约束类型: 设置 Topic 消息是否遵守 Schema 约束。如果选择** 'Schema' ** 则启用 Schema 约束,需要向 AutoMQ 内置 SchemaRegistry 注册消息 Schema,后续发送消息时严格按照 Schema,后续 Table Topic 会使用该 Schema 的字段填充 Iceberg 表;如果选择** 'Schemaless'** ,则代表消息内容无明确 Schema 约束,此时会将消息 Key 和 Value 作为整体字段填充 Iceberg 表。

点击确定
  1. 点击确定 ,创建支持流转表的 Topic。

步骤 5: 生产消息,通过 Spark-sql 查询 Iceberg 表数据

完成 AutoMQ 实例配置以及 Table Topic 创建后,即可测试生产数据,并在 Iceberg 表查询数据。

  1. 点击进入 Topic 详情,生产消息 Tab,输入测试的消息 Key 和消息 Value,发送消息。
新增Spark组件,然后登陆到 EMR 集群的 Master 节点,切换到 hadoop 用户。
  1. 新增Spark组件,然后登陆到 EMR 集群的 Master 节点,切换到 hadoop 用户。

  2. 执行如下命令,进入 spark-sql。


export HIVE_URI="thrift://YOUR_HOST:7004"
export WAREHOUSE="s3a://YOUR_BUCKER/iceberg"
export AWS_ACCESS_KEY=
export AWS_SECRET_KEY=
export AWS_REGION=
export AWS_S3_ENDPOINT='https://cos.$AWS_REGION.myqcloud.com'

spark-sql --master local[*] \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.hive=org.apache.iceberg.spark.SparkCatalog \
--conf spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.1,software.amazon.awssdk:bundle:2.31.14,org.apache.hadoop:hadoop-aws:3.3.1,software.amazon.awssdk:url-connection-client:2.17.178 \
--conf spark.sql.defaultCatalog=hive \
--conf spark.sql.catalog.hive.type=hive \
--conf spark.sql.catalog.hive.uri=$HIVE_URI \
--conf spark.sql.catalog.hive.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.hive.s3.endpoint=$AWS_S3_ENDPOINT \
--conf spark.sql.catalog.hive.warehouse=$WAREHOUSE \
--conf spark.sql.catalog.hive.client.region=$AWS_REGION \
--conf spark.sql.catalog.hive.s3.path-style-access=false \
--conf spark.sql.catalog.hive.s3.access-key-id=$AWS_ACCESS_KEY \
--conf spark.sql.catalog.hive.s3.secret-access-key=$AWS_SECRET_KEY \
--conf spark.hadoop.fs.s3a.path.style.access=false \
--conf spark.hadoop.fs.s3a.region=$AWS_REGION

  1. 执行SQL, 进行查询。可以看到 AutoMQ 实时将 Kafka 消息转换成对应数据表的数据记录。用户也可以使用其他查询引擎进行分析和计算。