Timeplus
Timeplus 是一款专注于流处理的数据分析平台,采用开源流式数据库 Proton,提供端到端的强大功能,支持团队快速、直观地处理流数据和历史数据。适用于各种规模和行业的组织,让数据工程师和平台工程师能够通过 SQL 充分释放流数据的价值。
本文介绍如何利用 Timeplus 控制台将 AutoMQ 数据导入 Timeplus。由于 AutoMQ 完全兼容 Apache Kafka,因此也可以创建 Kafka 外部流来分析 AutoMQ 数据,无需移动数据。
准备 AutoMQ 和测试数据
参考 Linux 主机部署多节点集群▸ 部署 AutoMQ,确保 AutoMQ 与 Timeplus 之间保持网络连通。
如果需要保持 IP 白名单,则需要将 Timeplus 服务的静态 IP 列入白名单:
52.83.159.13 对于 cloud.timeplus.com.cn
在 AutoMQ 中快速创建一个名为 example_topic
的主题,并向其中写入一条测试 JSON 数据,按照以下步骤操作。
创建 Topic
使用 Apache Kafka 命令行工具创建主题,需要确保当前拥有 Kafka 环境的访问权限并且 Kafka 服务正在运行。以下是创建主题的命令示例:
./kafka-topics.sh --create --topic exampleto_topic --bootstrap-server 10.0.96.4:9092 --partitions 1 --replication-factor 1
在执行命令时,需要将 topic 和 bootstarp-server 替换为实际使用的 Kafka 服务器地址。
创建完主题后,可以使用以下命令来验证主题是否已成功创建。
./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092
生成测试数据
生成一条 JSON 格式的测试数据,和前文的表需要对应。
{
"id": 1,
"name": "测试用户",
"timestamp": "2023-11-10T12:00:00",
"status": "active"
}
写入测试数据
通过 Kafka 的命令行工具或编程方式将测试数据写入到名为 example_topic 的主题中。下面是一个使用命令行工具的示例:
echo '{"id": 1, "name": "测试用户", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic
使用如下命令可以查看刚写入的 topic 数据:
sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning
在执行命令时,需要将 topic 和 bootstarp-server 替换为实际使用的 Kafka 服务器地址。
AutoMQ 数据源
-
在左侧导航菜单中,点击“数据摄取”,再点击右上角的“添加数据”按钮。
-
在弹出窗口中,查看可连接的数据源和其他添加数据的方式。由于 AutoMQ 完全兼容 Apache Kafka,直接选择 Apache Kafka。
-
输入 broker URL,关闭 TLS 和身份验证。
-
输入 AutoMQ 主题名称,并选择“读取为”的数据格式,支持 JSON、AVRO 和文本格式。
-
建议选择 Text 以保存整个 JSON 文档为字符串,便于处理架构变化。
-
选择 AVRO 时,可开启“自动提取”选项,将顶级属性存储为不同列。需指定 schema 注册表地址、API 密钥和密钥。
-
-
在“预览”步骤中展示至少一个事件,新数据源默认在 Timeplus 中创建新流。
-
命名流并验证列信息,可设置事件时间列。若不设置,系统将使用摄取时间。也可选择现有流。
-
预览数据后,为源命名、添加描述并审查配置。点击“完成”,流数据将立即在指定流中可用。
AutoMQ 源说明
使用 AutoMQ 数据源,需要遵守如下约束:
-
当前仅支持 AutoMQ Kafka 主题中的消息采用 JSON 和 AVRO 格式。
-
主题级别 JSON 属性将被转换为流列。 对于嵌套属性,元素将被保存为 String 列,然后可以用 JSON functions 之一来查询它们。
-
JSON 消息中的数值或布尔值类型将被转换为流中的对应类型。
-
日期时间或时间戳将被保存为字符串列。 可以通过 to_time function 将它们转换回 DateTime。