Debezium
Introduction
In today's enterprises, the increasing demand for data processing has made real-time monitoring and response to database changes crucially important. Whether it involves order processing and inventory management for e-commerce platforms or transaction monitoring in financial systems, capturing and handling real-time database changes is essential. This capability enhances system responsiveness and allows timely business operations in response to data changes.
This article will describe how to use Debezium [1] to monitor changes in MySQL databases and relay these change events to AutoMQ [2]. AutoMQ is an efficient and cost-effective stream processing system, offering high elasticity and availability, making it ideal for enterprises that require real-time data processing. By adopting this approach, businesses can manage order changes, inventory updates, and more, with the ability to set alert rules for automated monitoring. Downstream services can consume these messages to gain a current perspective on database changes, respond swiftly to business needs, optimize system performance, and ensure business continuity and stability.
Overview of AutoMQ
AutoMQ is a cloud-reimagined stream processing system that enhances cost-effectiveness and resilience while maintaining full compatibility with Apache Kafka. By decoupling storage to object storage, AutoMQ significantly enhances these aspects. Specifically, AutoMQ offloads storage to shared cloud storage such as EBS and S3 provided by cloud providers, through its S3Stream built on S3. It offers low-cost, low-latency, high-availability, high-reliability, and infinitely scalable stream storage capabilities.
Compared to the traditional Shared Nothing architecture, AutoMQ employs a Shared Storage architecture that significantly reduces storage and operational complexity while enhancing both system elasticity and reliability. The design philosophy and technical advantages of AutoMQ make it an ideal choice for enterprises looking to replace existing Kafka clusters. By adopting AutoMQ, enterprises can notably lower storage costs, simplify operations, and achieve automatic cluster scaling and traffic self-balancing, making it more efficient to adapt to changing business demands. Additionally, AutoMQ’s architecture supports efficient cold read operations and zero-downtime services, ensuring stable operation under high loads and sudden traffic surges. Its storage structure is detailed below:
Debezium Overview
Debezium is an open-source project that provides a low-latency streaming platform for Change Data Capture (CDC). By installing and configuring Debezium, you can monitor database changes and convert these change events into Kafka messages. Debezium supports multiple databases as data sources, including MySQL, PostgreSQL, and MongoDB, ensuring that only committed changes are visible. This means applications need not worry about transactions or rollbacks. Moreover, because Debezium uses persistent, replica-backed logs to record the history of database data changes, your application can stop and restart at any time without missing events that occurred during downtime, ensuring all events are processed correctly and completely.
Debezium leverages Kafka and Kafka Connect's persistence, reliability, and fault tolerance, with each connector monitoring an upstream database server, capturing all database changes, and recording them into Kafka Topics. This allows multiple clients to independently consume the same data change events while minimizing impact on the upstream database. Common use cases for Debezium include cache invalidation, simplifying monolithic applications, sharing databases, and data integration. With Debezium, enterprises can achieve real-time monitoring and processing of database changes, meeting various business scenario needs such as real-time data synchronization and event-driven architecture. Its architecture diagram is presented below:
Prerequisites
-
Available Docker environment.
-
An available AutoMQ node for receiving data change event messages.
-
A MySQL database with binlog enabled.
-
An available Kafka Connect service that can connect to the AutoMQ node.
-
Register the Debezium MySQL plugin with Kafka Connect for monitoring and converting data change operations.
Quick Deployment
Deploy AutoMQ
Refer to the AutoMQ official documentation for setup: Deploy Multi-Nodes Cluster on Linux▸. You will obtain the service access address for AutoMQ, e.g., 192.168.123.41:9092
, which will then be used to connect with AutoMQ via Kafka Connect.
Deploy MySQL
MySQL can be quickly deployed and configured using Docker with the official image provided by Debezium. This image includes some initial database tables, making the deployment process simpler. Use the following command to create a container named "mysql":
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:2.7
-
MYSQL_ROOT_PASSWORD
: Set the password for the root user. -
MYSQL_USER
andMYSQL_PASSWORD
: Set the username and password for a standard user.
Connect to the MySQL client as the regular user:
docker exec -it mysql mysql -u mysqluser -pmysqlpw
Verify the data using the command-line tool to view all existing database tables:
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| inventory |
| performance_schema |
+--------------------+
mysql> use inventory;
mysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses |
| customers |
| geom |
| orders |
| products |
| products_on_hand |
+---------------------+
Deploy Kafka Connect
Use the following command to pull the Kafka Connect image and start the container. Make sure to specify the AutoMQ service address:
docker run -it --rm --name connect -p 8083:8083 \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e STATUS_STORAGE_TOPIC=my_connect_statuses \
-e BOOTSTRAP_SERVERS=192.168.123.41:9092 \
--link mysql:mysql quay.io/debezium/connect:2.7
Parameter Description:
GROUP_ID | The group ID of the Kafka Connect cluster. |
---|---|
CONFIG_STORAGE_TOPIC | The AutoMQ topic for storing connector configurations. |
OFFSET_STORAGE_TOPIC | The AutoMQ topic for storing connector offsets. |
STATUS_STORAGE_TOPIC | AutoMQ topic used for storing connector status. |
--link mysql:mysql | Connect to the container named mysql. |
-e BOOTSTRAP_SERVERS=192.168.123.41:9092 | Specify the AutoMQ node address. |
If the connection fails, please check if AutoMQ and MySQL services have started successfully and if the address provided is correct.
Create and Register Debezium MySQL Connector
Debezium MySQL Connector acts as a plugin for Kafka Connect, allowing you to monitor data changes in a MySQL database. Thus, you can register the MySQL connector using curl by executing the following command:
# Create a File Within a Specified Directory.
cd /home
vim mysql-connector.json
The JSON file content is:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"topic.prefix": "dbserver1",
"database.include.list": "inventory",
"schema.history.internal.kafka.bootstrap.servers": "192.168.123.41:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory"
}
}
Use the following command to submit the connector configuration file to Kafka Connect:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @mysql-connector.json http://localhost:8083/connectors/
The successful response content is as follows:
HTTP/1.1 201 Created
Date: Mon, 05 Aug 2024 01:51:43 GMT
Location: http://localhost:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 518
Server: Jetty(9.4.53.v20231009)
{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"mysql","database.port":"3306","database.user":"debezium","database.password":"dbz","database.server.id":"184054","topic.prefix":"dbserver1","database.include.list":"inventory","schema.history.internal.kafka.bootstrap.servers":"192.168.123.41:9092","schema.history.internal.kafka.topic":"schema-changes.inventory","name":"inventory-connector-new"},"tasks":[],"type":"source"}
Verify Data Change Capture
Insert, Update, and Delete Database Data
By performing data insertions, updates, and deletions in the MySQL console, verify whether the Kafka Connector can capture these changes:
-- insert
INSERT INTO customers (first_name, last_name, email) VALUES ('John', 'Doe', 'john.doe@example.com');
-- update
UPDATE customers SET email='john.doe@newdomain.com' WHERE first_name='John' AND last_name='Doe';
-- delete
DELETE FROM customers WHERE first_name='John' AND last_name='Doe';
Examine AutoMQ Messages
Because the Kafka Connector logs are not distinct enough, a more noticeable method can be used to verify data change capture: check the Topic data in AutoMQ to validate if the capture was successful. This verification of Topic data can be done using scripts or visual monitoring tools; you can refer to the content below for specific operations.
Detection Using Kafka Scripts
Download the AutoMQ project binary package from: Github Releases [6]. After extraction, execute the script command in the root directory of the project. This command will retrieve the data changes from the customers table:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.123.41:9092 --topic dbserver1.inventory.customers --from-beginning
Make sure to replace the AutoMQ service address.
Results are as follows:
Detection Using Visualization Tools
Various visualization tools can be used to view the status of AutoMQ nodes, including Redpanda Console [7] and Kafdrop [8], among others. Using Redpanda Console as an example, you can view all Topics data and detailed information for the current AutoMQ node. For a detailed tutorial on deploying Redpanda Console, refer to the official AutoMQ documentation: Redpanda Console | AutoMQ [9].
Here, you can see all the database tables monitored by the connector, along with the database and connector configuration files, offsets, with each table corresponding to a Topic.
You can view detailed data change information, such as updates to customers:
You can verify data capture by updating SQL with more specific data.
Clean up the Environment
You can easily clean up the Docker environment by executing the following command:
docker stop mysql connect
Since the
--rm
parameter was included at startup, the container will automatically be removed once it stops.
Summary
Through the introduction in this article, we explore how to use Debezium to monitor changes in a MySQL database and send these change events to AutoMQ for processing. By deploying MySQL and Kafka Connect, and configuring the Debezium MySQL connector, enterprises can achieve real-time monitoring and processing of database changes to meet business needs such as order modifications and inventory management. The high efficiency and scalability of AutoMQ, along with the low latency and reliability of Debezium, make it an ideal choice for real-time data processing in enterprises. For more scalability options on obtaining event messages from data change monitoring, you can refer to: Debezium [10].
References
[1] Debezium: https://debezium.io/
[2] AutoMQ: https://www.automq.com/
[3] Kafka Connect: https://docs.confluent.io/platform/current/connect/index.html
[4] Debezium Structure: https://docs.redhat.com/zh_hans/documentation/red_hat_integration/2023.q2/html/debezium_user_guide/description-of-debezium-architecture
[5] Quick Start with AutoMQ: https://docs.automq.com/automq/getting-started
[6] Github Release: https://github.com/AutoMQ/automq/releases
[7] Redpanda Console: https://redpanda.com/redpanda-console-kafka-ui
[8] Kafdrop: https://github.com/obsidiandynamics/kafdrop
[9] Redpanda Console | AutoMQ: https://docs.automq.com/automq/integrations/kafka-ui/redpanda-console
[10] Debezium: https://debezium.io/