RisingWave
RisingWave 是一个分布式流数据库,提供标准的 SQL 接口,与 PostgreSQL 生态系统兼容,无需改动代码即可集成。RisingWave 将流视作表,允许用户以优雅方式在流数据和历史数据上编写复杂查询。借助 RisingWave,用户可以专注于查询分析逻辑,而无需学习 Java 或特定系统的底层 API。
本文将介绍如何通过 RisingWave Cloud 将数据从 AutoMQ 导入 RisingWave 数据库。
准备 AutoMQ 和测试数据
参考 本地部署▸ 部署 AutoMQ,确保 AutoMQ 与 RisingWave 之间保持网络连通。
在 AutoMQ 中快速创建一个名为 example_topic 的主题,并向其中写入一条测试 JSON 数据,按照以下步骤操作。
创建 Topic
使用 Apache Kafka 命令行工具创建主题,需要确保当前拥有 Kafka 环境的访问权限并且 Kafka 服务正在运行。以下是创建主题的命令示例:
./kafka-topics.sh --create --topic exampleto_topic --bootstrap-server 10.0.96.4:9092 --partitions 1 --replication-factor 1
在执行命令时,需要将 topic 和 bootstarp-server 替换为实际使用的 Kafka 服务器地址。
创建完主题后,可以使用以下命令来验证主题是否已成功创建。
./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092
生成测试数据
生成一条 JSON 格式的测试数据,和前文的表需要对应。
{
"id": 1,
"name": "测试用户",
"timestamp": "2023-11-10T12:00:00",
"status": "active"
}
写入测试数据
通过 Kafka 的命令行工具或编程方式将测试数据写入到名为 example_topic 的主题中。下面是一个使用命令行工具的示例:
echo '{"id": 1, "name": "测试用户", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic
使用如下命令可以查看刚写入的 topic 数据:
sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning
在执行命令时,需要将 topic 和 bootstarp-server 替换为实际使用的 Kafka 服务器地址。
在 RisingWave Cloud 上创建 AutoMQ 源
-
前往 RisingWave Cloud Clusters 创建集群。
-
前往 RisingWave Cloud Source 创建源。
-
指定集群和数据库,并登入数据库。
-
AutoMQ 100% 兼容 Apache Kafka, 因此只需要点击 Create source 并且选择 Kafka。
-
根据 RisingWave Cloud 的引导界面配置连接器,设置源信息和 schema 信息。
-
确认生成的 SQL 语句,点击 Confirm 完成源的创建。
AutoMQ 默认端口是 9092 并且没有开启 SSL。 如果需要启用 SSL,请参考文档 Apache Kafka Documentation。
本示例中可以通过设置启动模式为 earliest,并使用 JSON 格式来从头访问 topic 中的所有数据。
查询数据
-
前往 RisingWave Cloud Console ,登入集群。
-
运行下方 SQL 语句,访问已经导入的数据,其中替换 your_source_name 变量为创建源时指定的自定义名称。
SELECT * from {your_source_name} limit 1;