Skip to main content

RisingWave

RisingWave [1] 是一个分布式流数据库,提供标准的 SQL 接口,与 PostgreSQL 生态系统兼容,无需改动代码即可集成。RisingWave 将流视作表,允许用户以优雅方式在流数据和历史数据上编写复杂查询。借助 RisingWave,用户可以专注于查询分析逻辑,而无需学习 Java 或特定系统的底层 API。

本文将介绍如何通过 RisingWave Cloud [2] 将数据从 AutoMQ 导入 RisingWave 数据库。

准备 AutoMQ 和测试数据

参考 Linux 主机部署多节点集群▸ 部署 AutoMQ,确保 AutoMQ 与 RisingWave 之间保持网络连通。

在 AutoMQ 中快速创建一个名为 example_topic 的主题,并向其中写入一条测试 JSON 数据,按照以下步骤操作。

创建 Topic

使用 Apache Kafka 命令行工具创建主题,需要确保当前拥有 Kafka 环境的访问权限并且 Kafka 服务正在运行。以下是创建主题的命令示例:


./kafka-topics.sh --create --topic exampleto_topic --bootstrap-server 10.0.96.4:9092 --partitions 1 --replication-factor 1

在执行命令时,需要将 topic 和 bootstarp-server 替换为实际使用的 Kafka 服务器地址。

创建完主题后,可以使用以下命令来验证主题是否已成功创建。


./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092

生成测试数据

生成一条 JSON 格式的测试数据,和前文的表需要对应。


{
"id": 1,
"name": "测试用户",
"timestamp": "2023-11-10T12:00:00",
"status": "active"
}

写入测试数据

通过 Kafka 的命令行工具或编程方式将测试数据写入到名为 example_topic 的主题中。下面是一个使用命令行工具的示例:


echo '{"id": 1, "name": "测试用户", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic

使用如下命令可以查看刚写入的 topic 数据:


sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning

在执行命令时,需要将 topic 和 bootstarp-server 替换为实际使用的 Kafka 服务器地址。

在 RisingWave Cloud 上创建 AutoMQ 源

  1. 前往 RisingWave Cloud Clusters [3] 创建集群。

  2. 前往 RisingWave Cloud Source [4] 创建源。

  3. 指定集群和数据库,并登入数据库。

  4. AutoMQ 100% 兼容 Apache Kafka, 因此只需要点击 Create source 并且选择 Kafka。

  5. 根据 RisingWave Cloud 的引导界面配置连接器,设置源信息和 schema 信息。

  6. 确认生成的 SQL 语句,点击 Confirm 完成源的创建。

AutoMQ 默认端口是 9092 并且没有开启 SSL。 如果需要启用 SSL,请参考文档 Apache Kafka Documentation [5] 。

本示例中可以通过设置启动模式为 earliest,并使用 JSON 格式来从头访问 topic 中的所有数据。

查询数据

  1. 前往 RisingWave Cloud Console [6] ,登入集群。

  2. 运行下方 SQL 语句,访问已经导入的数据,其中替换 your_source_name 变量为创建源时指定的自定义名称。


SELECT * from {your_source_name} limit 1;

引用

[1] RisingWave: https://risingwave.com/

[2] RisingWave Cloud: https://cloud.risingwave.com/

[3] Clusters: https://cloud.risingwave.com/clusters/

[4] Source: https://cloud.risingwave.com/source/

[5] Apache Kafka Documentation: https://kafka.apache.org/documentation/#security_ssl

[6] Console: https://cloud.risingwave.com/console/