Table Topic Configuration
This document outlines the configuration of the AutoMQ Table Topic feature, which includes the Schema Registry, Catalog, and the setup of the AutoMQ server.
Schema Registry Configuration
The Schema Registry manages the Kafka Topic message Schema, ensuring data consistency and compatibility between producers and consumers. AutoMQ Table Topic utilizes the Schema Registry to parse Topic data and synchronize with the Iceberg table structure, supporting Schema evolution.
Constraints
Table Topic currently supports only the following Schema Registry implementations.
-
Confluent Schema Registry: Facilitates schema management for Kafka and offers a REST API.
-
Aiven Karapace: An open-source Schema Registry that is compatible with the Confluent Schema Registry's REST API.
Table Topic does not currently support authentication configurations for Schema Registry (such as Basic Auth, SASL, mTLS). You must configure the Schema Registry to permit anonymous access for AutoMQ clusters and ensure network security.
Configure Schema Registry
Users specify the Schema Registry service address via the following parameters (cluster-level configuration).
Configuration Parameters:
automq.table.topic.schema.registry.url
: The URL of the Schema Registry (e.g.,http://schema-registry.example.com:8081
). AutoMQ uses this URL to retrieve and synchronize the Schema.
Note:
- This must be configured when using
automq.table.topic.schema.type=schema
.
Example:
automq.table.topic.schema.registry.url=https://schema-registry.example.com:8081
Catalog Configuration
In Apache Iceberg, the Catalog plays a crucial role in managing table metadata. Its primary responsibilities include:
-
Tracking the current metadata pointer for each Iceberg table. This pointer indicates the location of the latest metadata file (e.g.,
vN.metadata.json
). -
Providing atomic operations to update this metadata pointer. This is critical for ensuring data consistency during commit processes such as writing new data or evolving schemas.
-
Organizing tables into namespaces and providing methods to list, create, delete, and rename tables.
Essentially, a Catalog serves as the entry point for any Iceberg operation, guiding the query engine on where to find authoritative information regarding a table's schema, partitions, snapshots, and data files.
Configuring the Catalog
Table Topic utilizes an external Iceberg Catalog to manage the metadata of its "table topics."
-
General Configuration Prefix: All Iceberg Catalog settings related to the AutoMQ Table Topic use the prefix
automq.table.topic.catalog.*
. -
Primary Configuration Key: Specifies the type of Iceberg Catalog, as defined by the
automq.table.topic.catalog.type
attribute.
AutoMQ Table Topic supports the following catalog types: rest
, glue
, tablebucket
, nessie
, hive
.
Supported Catalog Types and Configuration
1. REST Catalog
This method uses the standard Iceberg REST Catalog service.
-
Type Setting:
automq.table.topic.catalog.type=rest
-
Configuration Property:
-
automq.table.topic.catalog.uri
: The URI for the REST Catalog service (e.g.,http://rest:8181
). -
automq.table.topic.catalog.warehouse
: Defines the S3 path for the Iceberg data warehouse. If not set, it will default to the Iceberg directory of the DataBucket.
-
-
Required Permissions:
-
Credentials (if authentication is needed) that allow listing, creating, reading, and updating table metadata.
-
AutoMQ requires read/write/delete permissions for the S3 warehouse path.
-
Example:
automq.table.topic.catalog.type=rest
automq.table.topic.catalog.uri=http://rest:8181
automq.table.topic.catalog.warehouse=s3://automq-bucket/wh/
2. AWS Glue Data Catalog
Utilizing AWS Glue as the Iceberg Catalog.
-
Type Setting:
automq.table.topic.catalog.type=glue
-
Configuration Property:
automq.table.topic.catalog.warehouse
: Specifies the S3 path for the Iceberg data warehouse.
-
Required Permissions:
-
AutoMQ requires database and table management permissions for AWS Glue.
-
AutoMQ needs read/write/delete permissions on the S3 warehouse path.
Example:
automq.table.topic.catalog.type=glue
automq.table.topic.catalog.warehouse=s3://automq-bucket/glue/
3. TableBucket (S3Table)
-
Type Setting:
automq.table.topic.catalog.type=tablebucket
-
Configuration Property:
automq.table.topic.catalog.warehouse
: Specifies the TableBucket ARN (e.g.,arn:aws:s3tables:::bucket-name
).
-
Required Permissions:
- AutoMQ requires read and write access to the S3Table.
Example:
automq.table.topic.catalog.type=tablebucket
automq.table.topic.catalog.warehouse=arn:aws:s3tables:us-east-1:xxxxxx:bucket/xxxxx
4. Nessie Catalog
Utilize Project Nessie, a transactional data lake catalog with Git-like semantics.
-
Type Setting:
automq.table.topic.catalog.type=nessie
-
Configuration Property:
-
automq.table.topic.catalog.uri
: Specifies the URI of the Nessie server (e.g.,http://nessie-server:19120/api/v2
). -
automq.table.topic.catalog.warehouse
: Defines the S3 path for the Iceberg data warehouse. If not set, it will default to the Iceberg directory of the DataBucket.
-
Example:
automq.table.topic.catalog.type=nessie
automq.table.topic.catalog.uri=http://nessie-server:19120/api/v2
automq.table.topic.catalog.warehouse=s3://automq-bucket/nessie/
5. Hive Metastore Catalog
Utilize the existing Hive Metastore as an Iceberg Catalog. When configuring Hive Metastore as the Catalog, refer to documentation for enabling Iceberg in Hive.
-
Type Setting:
automq.table.topic.catalog.type=hive
-
Configuration Property:
-
automq.table.topic.catalog.uri
: Specifies the Hive Metastore URI (e.g.,thrift://hostname:9083
). -
automq.table.topic.catalog.warehouse
: Specifies the HDFS or S3 path for the Iceberg data warehouse. If not configured, the Iceberg directory of DataBucket will be used.
-
-
Authentication Configuration:
-
Simple Authentication:
automq.table.topic.catalog.auth=simple://?username=xxx
.
-
Kerberos Authentication:
automq.table.topic.catalog.auth=kerberos://?principal=base64(clientPrincipal)&keytab=base64(keytabFile)&krb5conf=base64(krb5confFile)-
clientPrincipal
: Base64 encoded client Kerberos principal. -
keytabFile
: Base64 encoded content of the client keytab file. -
krb5confFile
: Base64 encoded content of thekrb5.conf
file. -
automq.table.topic.hadoop.metastore.kerberos.principal
: Kerberos principal for the Hive Metastore server (e.g.,hive/_HOST@REALM
).
-
-
Required Permissions:
-
Create, alter, and drop table permissions for the Hive Metastore.
-
Kerberos authentication requires a valid principal and keytab.
-
AutoMQ needs read, write, and delete permissions in the repository path (HDFS or S3).
-
Example:
automq.table.topic.catalog.type=hive
automq.table.topic.catalog.uri=thrift://hostname:9083
automq.table.topic.catalog.warehouse=s3://automq-bucket/hive/
automq.table.topic.catalog.auth=simple://?username=user&password=pass
TableTopic Configuration
The following configurations are all topic-level settings that allow for enabling TableTopic features, adjusting the Iceberg submission interval, and configuring features such as partitioning and upsert.
Enabling the TableTopic Feature
The AutoMQ TableTopic feature allows for the conversion of data from a specified Kafka Topic into an Apache Iceberg table to support structured queries and analysis.
Configuration Parameters:
-
automq.table.topic.enable
: Determines whether the TableTopic is enabled. Setting this totrue
will result in the creation of an Iceberg table for data storage. -
automq.table.topic.namespace
: Specifies the namespace for the Iceberg table under the Catalog, adhering to the naming conventions of the Catalog (such as a combination of letters, numbers, and underscores). This should be configured at the time of Topic creation and cannot be modified later. -
automq.table.topic.schema.type
: Specifies the schema mode, configured during Topic creation and cannot be altered later. The default is schemaless, supporting the following values:-
schemaless
: Does not parse the message content, writes based solely on the built-in Kafka message schema, writing only thetimestamp
,key
, andvalue
fields of the record, without relying on the Schema Registry. -
schema
: Depends on external Schema Registry integration in the cluster configuration, and the Producer must register the schema with the Schema Registry when producing messages to the Topic.
-
Example:
automq.table.topic.enable=true
automq.table.topic.namespace=default
automq.table.topic.schema.type=schema
Configuration Submission Interval
To enhance real-time analytics, you can modify the commit frequency of an Iceberg table to improve data freshness.
Configuration Parameters:
automq.table.topic.commit.interval.ms
: Specifies the data commit interval in milliseconds. The default is 300,000 (5 minutes), with a maximum of 900,000 (15 minutes). Shorter intervals improve real-time capability but increase processing overhead.
Note:
-
Frequent commits may result in commit conflicts and MetadataFile bloat, which can raise storage and query costs.
-
AutoMQ automatically deletes snapshots that are older than one hour to prevent the
metadata.json
file from becoming excessively large over time. -
Regular table maintenance is necessary through compaction to merge small files, optimizing the size of the
ManifestFile
and improving query performance.
Example:
automq.table.topic.commit.interval.ms=60000
Optimize Query Performance: Partition Configuration
To enhance the query performance of Iceberg tables, especially in scenarios involving selective filtering, partitioning rules can be configured.
Configuration Parameters:
automq.table.topic.partition.by
: Specifies partitioning rules, supporting partitioning by fields or functions. For example,[bucket(name, 3), month(timestamp)]
means partitioning by hashing thename
field into 3 buckets and partitioning by the month of thetimestamp
.
Supported Partition Strategies:
-
Bucket Partitioning:
bucket(field, N)
, partitions by the hash value of the field. -
Truncate Partitioning:
truncate(field, N)
, partitions by the truncated value of the field. -
Temporal Partitioning:
year(timestamp)
,month(timestamp)
,day(timestamp)
,hour(timestamp)
.
For more details, refer to Iceberg Partitioning Documentation.
Note:
- Having too many partitions may increase metadata management overhead, leading to query planning delays and higher storage costs. It is recommended to set a reasonable number of partitions based on data volume and query patterns, and to regularly optimize small files through Compaction.
Example:
automq.table.topic.partition.by=[bucket(name, 3), month(timestamp)]
Upsert and CDC Mode Support
To facilitate Upsert or Change Data Capture (CDC) operations, you can enable the following features for dynamic data management. A primary key must be configured to support row-level operations.
Configuration Parameters:
-
automq.table.topic.id.columns
: Specify the primary key columns (composite keys are supported), such as[region, name]
. -
automq.table.topic.upsert.enable
: Indicate whether to enable Upsert mode. Set totrue
to have the system insert or update records based on the primary key. -
automq.table.topic.cdc.field
: Indicates the type of CDC operation, which can be eitherI
(insert),U
(update), orD
(delete).
Note:
-
To enable Upsert or CDC mode, it is necessary to configure
automq.table.topic.id.columns
; otherwise, only append-only writes will be supported. -
Iceberg V2 tables support row-level operations:
-
Creation Logic: Records inserted or updated are kept in the datafile, while deleted records are marked via a deletefile (which includes the primary key and deletion marker). An Upsert operation might create a new datafile, and the CDC's delete operation results in a deletefile.
-
Query Logic: Use the Merge-on-Read (MOR) mechanism to combine data files and delete files during queries, filtering out records marked for deletion using equality delete.
-
-
Enhance query performance consistently by merging data files and delete files through Compaction.
Example:
- Enable Upsert Mode:
automq.table.topic.upsert.enable=true
automq.table.topic.id.columns=[id, name]
- Enable CDC Mode
automq.table.topic.cdc.field=op_type
automq.table.topic.id.columns=[id, name]