Skip to main content

StarRocks

StarRocks 是一款高性能的分析型数据仓库,采用向量化、MPP 架构、CBO、智能物化视图以及可实时更新的列式存储引擎等先进技术,能够支持多维、实时和高并发的数据分析。

本文将介绍如何使用 StarRocks Routine Load 将 AutoMQ 中的数据导入 StarRocks。详细了解 Routine Load 的基本原理,请参考 Routine Load 基本原理文档。

环境准备

准备 StarRocks 和测试数据

确保当前已准备好可用的 StarRocks 集群。为了便于演示,我们参考 使用 Docker 部署 StarRocks 在一台 Linux 机器上安装用于演示的集群。

创建库和主键模型的测试表:



create database automq_db;
create table users (
id bigint NOT NULL,
name string NOT NULL,
timestamp string NULL,
status string NULL
) PRIMARY KEY (id)
DISTRIBUTED BY HASH(id)
PROPERTIES (
"replication_num" = "1",
"enable_persistent_index" = "true"
);

准备 AutoMQ 和测试数据

参考 Linux 主机部署多节点集群▸ 部署 AutoMQ,确保 AutoMQ 与 StarRocks 之间保持网络连通。

在 AutoMQ 中快速创建一个名为 example_topic 的主题,并向其中写入一条测试 JSON 数据,按照以下步骤操作。

创建 Topic

使用 Apache Kafka 命令行工具创建主题,需要确保当前拥有 Kafka 环境的访问权限并且 Kafka 服务正在运行。以下是创建主题的命令示例:


./kafka-topics.sh --create --topic exampleto_topic --bootstrap-server 10.0.96.4:9092 --partitions 1 --replication-factor 1

在执行命令时,需要将 topic 和 bootstarp-server 替换为实际使用的 Kafka 服务器地址。

创建完主题后,可以使用以下命令来验证主题是否已成功创建。


./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092

生成测试数据

生成一条 JSON 格式的测试数据,和前文的表需要对应。


{
"id": 1,
"name": "测试用户",
"timestamp": "2023-11-10T12:00:00",
"status": "active"
}

写入测试数据

通过 Kafka 的命令行工具或编程方式将测试数据写入到名为 example_topic 的主题中。下面是一个使用命令行工具的示例:


echo '{"id": 1, "name": "测试用户", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic

使用如下命令可以查看刚写入的 topic 数据:


sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning

在执行命令时,需要将 topic 和 bootstarp-server 替换为实际使用的 Kafka 服务器地址。

创建 Routine Load 导入作业

在 StarRocks 的命令行中创建一个 Routine Load 作业,用来持续导入 AutoMQ Kafka topic 中的数据。


CREATE ROUTINE LOAD automq_example_load ON users
COLUMNS(id, name, timestamp, status)
PROPERTIES
(
"desired_concurrent_number" = "5",
"format" = "json",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.timestamp\",\"$.status\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "10.0.96.4:9092",
"kafka_topic" = "example_topic",
"kafka_partitions" = "0",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

在执行命令时,需要将 kafka_broker_list 替换为实际使用的 Kafka 服务器地址。

参数说明

数据格式

需要 PROPERTIES 子句的 "format" = "json" 中指定数据格式为 JSON。

数据提取和转换

如果需要指定源数据和目标表之间列的映射和转换关系,可以配置 COLUMNSjsonpaths 参数。在 COLUMNS 中,列名对应目标表 的列名,列的顺序对应源数据 中的列的顺序。而 jsonpaths 参数则用于提取 JSON 数据中所需的字段数据,类似于新生成的 CSV 数据。接着, COLUMNS 参数会按照 jsonpaths 中字段的顺序进行临时命名。想了解更多关于数据转换的内容,请查看导入时实现数据转换文档

如果每行一个 JSON 对象中 key 的名称和数量(顺序不需要对应)都能对应目标表中列,则无需配置 COLUMNS

验证数据导入

首先,检查 Routine Load 导入作业的状态,确保任务正在运行中。


show routine load\G;

然后查询 StarRocks 数据库中的相关表,可以看到数据已经被成功导入。


StarRocks > select * from users;
+------+--------------+---------------------+--------+
| id | name | timestamp | status |
+------+--------------+---------------------+--------+
| 1 | 测试用户 | 2023-11-10T12:00:00 | active |
| 2 | 测试用户 | 2023-11-10T12:00:00 | active |
+------+--------------+---------------------+--------+
2 rows in set (0.01 sec)