Skip to Main Content

StarRocks

StarRocks is a high-performance analytical data warehouse that leverages advanced technologies such as vectorization, MPP architecture, CBO, intelligent materialized views, and a real-time updatable columnar storage engine. It supports multidimensional, real-time, and high-concurrency data analysis.

This article will introduce how to use StarRocks Routine Load to import data from AutoMQ into StarRocks. For a detailed understanding of the basic principles of Routine Load, please refer to the Routine Load Basic Principles documentation.

Prerequisites

Prepare StarRocks and Test Data

Ensure that a usable StarRocks cluster is already prepared. For demonstration purposes, we refer to Deploy StarRocks with Docker to install a demonstration cluster on a Linux machine.

Create test tables for the database and primary key model:



create database automq_db;
create table users (
id bigint NOT NULL,
name string NOT NULL,
timestamp string NULL,
status string NULL
) PRIMARY KEY (id)
DISTRIBUTED BY HASH(id)
PROPERTIES (
"replication_num" = "1",
"enable_persistent_index" = "true"
);

Prepare AutoMQ and Test Data

Refer to Deploy Multi-Nodes Cluster on Linux▸ to deploy AutoMQ and ensure network connectivity between AutoMQ and StarRocks.

Quickly create a topic named example_topic in AutoMQ and write a test JSON data to it following these steps.

Create Topic

Use the Apache Kafka® command-line tool to create a topic. Ensure you have access to the Kafka environment and that the Kafka service is running. Below is an example command to create a topic:


./kafka-topics.sh --create --topic exampleto_topic --bootstrap-server 10.0.96.4:9092 --partitions 1 --replication-factor 1

When executing the command, replace topic and bootstrap-server with the actual Kafka server address.

After creating the topic, you can use the following command to verify whether the topic has been successfully created.


./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092

Generate Test Data

Generate test data in JSON format that corresponds to the table mentioned earlier.


{
"id": 1,
"name": "test_user",
"timestamp": "2023-11-10T12:00:00",
"status": "active"
}

Writing Test Data

Use Kafka's command-line tools or programming methods to write test data into a Topic named example_topic. Here is an example using the command-line tool:


echo '{"id": 1, "name": "Test User", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic

Use the following command to view the data just written to the Topic:


sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning

When executing the command, replace topic and bootstrap-server with the actual Kafka server address.

Creating Routine Load Import Job

Create a Routine Load job in the StarRocks command line to continuously import data from the AutoMQ Kafka Topic.


CREATE ROUTINE LOAD automq_example_load ON users
COLUMNS(id, name, timestamp, status)
PROPERTIES
(
"desired_concurrent_number" = "5",
"format" = "json",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.timestamp\",\"$.status\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "10.0.96.4:9092",
"kafka_topic" = "example_topic",
"kafka_partitions" = "0",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

When executing the command, replace kafka_broker_list with the actual Kafka server address.

Parameter Description

Data Format

The data format needs to be specified as JSON in the PROPERTIES clause with "format" = "json".

Data Extraction and Transformation

If you need to specify a mapping and conversion relationship between the source data and the target table columns, you can configure the COLUMNS and jsonpaths parameters. In COLUMNS, the column names correspond to the column names of the target table, and the order of columns corresponds to the order of columns in source data. The jsonpaths parameter is used to extract the necessary field data from the JSON data, similar to newly generated CSV data. Subsequently, the COLUMNS parameter will temporarily name the fields in the order specified by jsonpaths. For more information on data conversion, please refer to Data Conversion Implementation During Import.

If each line contains a JSON object where the names and number of keys correspond to the columns in the target table (order does not need to match), the COLUMNS configuration is not required.

Validate Data Import

First, check the status of the Routine Load import job to ensure the task is running.


show routine load\G;

And then querying the relevant tables in the StarRocks database, you can see that the data has been successfully imported.


StarRocks > select * from users;
+------+--------------+---------------------+--------+
| id | name | timestamp | status |
+------+--------------+---------------------+--------+
| 1 | Test User | 2023-11-10T12:00:00 | active |
| 2 | Test User | 2023-11-10T12:00:00 | active |
+------+--------------+---------------------+--------+
2 rows in set (0.01 sec)