Foreword
Due to the comprehensive compatibility of AutoMQ[1] with Kafka[2], the implementation of SASL security authentication configuration in AutoMQ is identical to that in Kafka. Through this article, you will learn how to securely use AutoMQ by configuring SASL.
Prerequisites
Each server must define a set of listeners to receive requests from clients and other servers. All services are exposed through listeners. This article focuses on how to build a secure Kafka environment around listeners, so before diving into the subsequent content, you need to have a general understanding of listeners.
Listener Configuration
We can configure each listener to authenticate clients using various mechanisms and ensure that traffic between the server and clients is encrypted.
Kafka servers support listening to connections on multiple ports. This is configured through the listeners property in the server configuration, which accepts a comma-separated list of listeners. Each server must define at least one listener. The format for each listener defined in listeners is as follows:
{LISTENER_NAME}://{hostname}:{port}
Examples:
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
## Note that the hostname here is optional, the absence of hostname represents binding to 0.0.0.0 i.e., all interfaces
## The LISTENER_NAME here corresponds to SASL_PLAINTEXT and CONTROLLER
LISTENER_NAME is typically a descriptive name used to define the purpose of the listener. For example, many configurations use separate listeners to handle client traffic, so they might name the corresponding listener as CLIENT in the configuration.
listeners=CLIENT://localhost:9092
The reason why LISTENER_NAME is typically a descriptive name is that we can directly assign LISTENER_NAME as the protocol name, thus skipping the protocol configuration step.
Using the example above, we can use the following definition to skip the definition of the CLIENT listener.
listeners=PLAINTEXT://localhost:9092
This example specifies that the listener on localhost:9092 uses the PLAINTEXT protocol.
However, the official Kafka documentation does not recommend using this naming convention.
We recommend that users provide explicit names for listeners to clearly indicate their intended usage.
If you usealiases to name listeners , the security protocols for these listeners need to be defined in a separate configuration : listener.security.protocol.map. This value is a comma-separated list that maps each listener to its security protocol. For example, the following configuration specifies that the CLIENT listener uses SSL, while the BROKER listener uses PLAINTEXT.
listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT
The optional Kafka security protocols (case-insensitive ) are as follows:
- PLAINTEXT 
- SSL 
- SASL_PLAINTEXT 
- SASL_SSL 
The PLAINTEXT protocol does not provide security, and all protocols other than PLAINTEXT require additional configuration (note that this additional configuration refers to settings required by the protocol itself, not the above-mentioned mapping configuration).
Configuration Items
Here, we need to provide a detailed explanation of the listener-related configuration items to help readers understand the subsequent content.
Before explaining this configuration item, let's clarify a few concepts.
- VPC refers to a Virtual Private Cloud based on a cloud computing platform, where Brokers within the same VPC can communicate with each other through private network IPs. 
- Different servers deployed within the same VPC can communicate via both public and private network IPs. 
- For a given VPC, any Producer or Consumer launched within it is referred to as an Internal Client; otherwise, it is referred to as an External Client. The diagram below uses VPC 1 as the main perspective. 
- Brokers in a VPC have internal network IPs and can also have public network IPs. 

The above represents a very common Kafka cluster scenario, where the dashed arrows in the diagram indicate communication established through different Listeners in Kafka. These Listeners, still using VPC 1 as the main perspective, are categorized into Internal Listener and External Listener. Their functions are as follows:

The creation of these Listeners and how internal and external communications occur are determined by the Listener configuration items. Below, we will detail these configuration items.
listeners
Purpose: Used to specify the address and port on which the Kafka broker listens for TCP connections.
In the above diagram, all red and blue Listeners are represented.
The configuration format is as follows:
liseners={listenreName}:{hostname}//{port},{listenreName2}:{hostname2}//{port2}
For example:
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
## Note that the hostname here is optional, the absence of hostname represents binding to 0.0.0.0 i.e., all interfaces
It has the following features:
- Multiple configurations can be set simultaneously and separated by commas. 
- The listener's name and port must both be unique ; there cannot be two listeners with the same name even if their ports are different. 
- If the hostname is empty, for example (listeners = ://:port), it represents binding to 0.0.0.0, meaning all interfaces. 
- Setting the hostname to 0.0.0.0 will bind to all interfaces, meaning all interface requests will be accepted and processed. However, note that when setting it to 0.0.0.0, advertised.listeners must be set. The detailed reason will be explained below. 
- listenerName is the name of the listener, a unique value. It is not a security protocol but simply a mapping to the security protocol name in the default configuration. The default mappings are as follows: 
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
Regarding the default interface bound by the hostname, many tutorials online, including the official documentation, describe it as binding to the default interface.



advertised.listeners
Many Chinese tutorials assume the default interface binds to localhost, but in reality, the source code binds to 0.0.0.0 by default, which listens on all interfaces.
In an IaaS environment, this may need to bind to a different interface than the Broker. If not set, the value of listeners will be used. Unlike listeners, publishing the 0.0.0.0 address is invalid. Additionally, unlike listeners, this property can have duplicate ports, allowing one listener to be configured as the address of another listener. This is useful in some cases where an external load balancer is used.
The configuration format is as follows:
advertised.listeners={listenreName}:{hostname}//{port},{listenreName2}:{hostname2}//{port2}
It has the following characteristics:
- By default, if advertised.listeners is not set, it will automatically use the value of the listeners property. 
- Setting it to 0.0.0.0 is not supported. The source code requires that if listeners is set to 0.0.0.0, the advertised.listeners property must be set because other Brokers and Clients need to know your specific IP and port. 

- Multiple configurations can be specified simultaneously, separated by commas.
listener.security.protocol.map
A collection of mappings between listener names and security protocols. If a listener name is not a security protocol, you must set listener.security.protocol.map.
The configuration format is as follows:
listener.security.protocol.map={Listener Name}:{Security Protocol Name},{Listener Name2}:{Security Protocol Name},{Listener Name2}:{Security Protocol Name2}
The attribute is described in KafkaConfig.scala as follows:

Its default value is configured in Defaults.java:

Therefore, once your listener has a custom name, you need to configure this mapping. After configuration, the default mapping will be overridden. All you need to do is configure the required mappings for all listener names . In fact, the Kafka official website also recommends specifying all required mappings clearly, rather than using the default values.
inter.broker.listener.name
The Listener name used for communication between Brokers. If not set, the Listener name is defined by security.inter.broker.protocol (which defaults to PLAINTEXT).
security.inter.broker.protocol and inter.broker.listener.name cannot be set simultaneously.
Configuration Format:
inter.broker.listener.name={Listener Name}
It is worth noting that if this property is used, the advertised.listeners property must also be set, and the name configured must be included in advertised.listeners. The rationale is straightforward: the purpose of setting this property is to enable communication between Brokers. The communication process involves using the locally configured listener name to locate other Brokers' listener EndPoints. Therefore, typically, all Broker listener names within a cluster must be consistent; otherwise, the corresponding EndPoint cannot be found, and normal requests cannot be initiated. Setting advertised.listeners ensures that the Broker itself can also communicate with other Brokers. This is strictly enforced in the Kafka source code KafkaConfig.scala, where it specifies that the names in advertised.listeners must be configured in listeners as well.
val listenerNames = listeners.map(_.listenerName).toSet
if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) {
  // validations for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located)
  validateAdvertisedListenersNonEmptyForBroker()
  require(advertisedListenerNames.contains(interBrokerListenerName),
    s"${KafkaConfig.InterBrokerListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " +
      s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}")
  require(advertisedListenerNames.subsetOf(listenerNames),
    s"${KafkaConfig.AdvertisedListenersProp} listener names must be equal to or a subset of the ones defined in ${KafkaConfig.ListenersProp}. " +
      s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " +
      s"are ${listenerNames.map(_.value).mkString(",")}"
  )
}
security.inter.broker.protocol
The security protocol used for communication between Brokers has only the following valid values:
- PLAINTEXT 
- SSL 
- SASL_PLAINTEXT 
- SASL_SSL 
The difference between it and inter.broker.listener.name is that this configuration has only four options, all of which are security protocols, whereas inter.broker.listener.name is a listener name that needs to be mapped to its security protocol and IP:PORT.
If inter.broker.listener.name is not configured, the default configuration of security.inter.broker.protocol will be used.
Generally, if a custom listener name is defined, inter.broker.listener.name must be set and cannot be replaced by security.inter.broker.protocol.
Note
In a KRaft cluster, a Broker refers to any server that has the broker role enabled in process.roles, while a Controller refers to any server with the controller role enabled. The listener configuration depends on the role. The listener defined by inter.broker.listener.name is specifically used for handling requests between Brokers. On the other hand, a Controller must use a separate listener name defined by the controller.listener.names configuration. This listener name cannot be set to the same value as inter.broker.listener.name.
The Controller receives requests from other Controllers as well as from Brokers. Therefore, even if a server does not have the controller role enabled (i.e., it is just a Broker), it still must define the Controller listener and configure any necessary security properties to send requests to other Controllers as specified in the configuration. For example:
process.roles=broker
listeners=BROKER://localhost:9092
inter.broker.listener.name=BROKER
controller.quorum.voters=0@localhost:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
In this example, the Controller listener is configured to use the SASL_SSL security protocol but is not included in the listeners because the Broker itself does not expose the Controller listener. The port used in this example comes from the controller.quorum.voters configuration, which defines the complete list of Controllers.
For KRaft servers with both Broker and Controller roles enabled, the configuration approach is similar. The only difference is that the Controller listener must be included in the listeners:
process.roles=broker,controller
listeners=BROKER://localhost:9092,CONTROLLER://localhost:9093
inter.broker.listener.name=BROKER
controller.quorum.voters=0@localhost:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
The port defined in controller.quorum.voters must exactly match one of the exposed Controller listeners. For example, here the CONTROLLER listener is bound to port 9093. Thus, the connection string defined by controller.quorum.voters must also use port 9093, as shown here.
The Controller will accept requests on all listeners defined by controller.listener.names. Typically, there is only one Controller listener, but there can be more. For instance, during a cluster rolling upgrade, the active listener might be switched from one port or security protocol to another (one roll to expose the new listener, another to remove the old listener). When multiple controller listeners are defined, the first one in the list is used for outgoing requests.
The traditional practice in Kafka is to configure separate listeners for clients to achieve network-level isolation. This method ensures that inter-cluster communication is separated from client communication. In KRaft mode, Controller listeners also need to be isolated, as they are only used for internal cluster management and not by clients. Clients should connect to other listeners configured on the Brokers. Any requests bound to the Controller will be forwarded as follows:
In a KRaft cluster, clients send administrative requests like CreateTopics and DeleteTopics to the Broker listeners. The Broker then forwards the requests to the active Controller using the first listener configured in controller.listener.names.
Using SASL for Authentication
Simple Authentication and Security Layer (SASL). Kafka uses Java Authentication and Authorization Service (JAAS) for SASL configuration.
Configuring JAAS for Kafka Broker
There are two ways to configure JAAS for Kafka. One way is to use a separate JAAS configuration file as a JVM parameter, and the other is to configure it through the sasl.jaas.config property in Kafka's configuration file. This article will use the second method.
Configuring JAAS through JAAS Configuration File
The KafkaServer section is part of the JAAS file used by each Kafka server/broker. This section provides SASL configuration options for the broker, including any SASL client connections established for inter-broker communication. If multiple listeners are configured to use SASL, this section can use the lowercase listener name as a prefix followed by a period, for example: sasl_ssl.KafkaServer. If only one listener is configured for SASL, there is no need to specify the listener name.
The Client section is used to authenticate SASL connections with Zookeeper. Since AutoMQ uses Kraft mode and discards Zookeeper, this part is not described.
The specific kafka_server_jaas.conf file content is as follows:
yyy.KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="_automq"
    password="automq-secret"
    user__automq="automq-secret"; # Note that the ";" here is essential.
};
## The username and password are configurations used for inter-Broker communication
## In addition, a user named _automq is defined with the password automq-secret
## Further configurations beginning with other listener names can be set up as follows
xxx.KafkaServer{
    ...
};
The JAAS configuration file location can be specified as a JVM parameter to take effect.
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
Configure JAAS through the sasl.jaas.config property.
Brokers can directly configure JAAS in the configuration file using the sasl.jaas.config property. The property name must be prefixed with the listener name and SASL mechanism, i.e., listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config, and the configuration value can specify only one login module . If multiple mechanisms are configured on a listener, configurations must be provided for each mechanism using the listener and mechanism prefix. For example:
listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="admin" \
    password="admin-secret";
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="admin" \
    password="admin-secret" \
    user_admin="admin-secret" \
    user_alice="alice-secret";
If multiple configuration methods are used, they take effect in the following order:
- Broker configuration property listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config 
- Static JAAS configuration of {listenerName}.KafkaServer 
- Static JAAS configuration of KafkaServer 
SASL configuration
SASL can use either PLAINTEXT or SSL as the transport layer, so the security protocol can be set to SASL_PLAINTEXT or SASL_SSL. If using SASL_SSL, SSL must also be configured.
SASL Mechanisms
Kafka supports the following SASL mechanisms:
- GSSAPI (Kerberos)[4] 
- PLAIN[5] 
- OAUTHBEARER[8] 
This article only covers the configuration for the PLAIN mechanism. For other mechanisms, please refer to the official documentation.
SASL Configuration for Broker
Configure the SASL port in the server.properties file by adding at least one of SASL_PLAINTEXT or SASL_SSL to the listeners parameter, which includes one or more comma-separated values:
listeners=SASL_PLAINTEXT://hostName:port,SASL_SSL://hostName2:prot2
If only configuring the SASL port (or if you want Kafka Brokers to use mutual SASL authentication), ensure that the same SASL protocol is set for inter-broker communication:
security.inter.broker.protocol=SASL_PLAINTEXT
The overall SASL required configuration is as follows:
listeners=BROKER_SASL://:9092,CONTROLLER_SASL://:9093
inter.broker.listener.name=BROKER_SASL
sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN,SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.mechanism.controller.protocol=PLAIN
listener.name.broker_sasl.plain.connections.max.reauth.ms=10000
controller.listener.names=CONTROLLER_SASL
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,BROKER_SASL:SASL_PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER_SASL:SASL_PLAINTEXT
listener.name.broker_sasl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="_automq" \
   password="automq-secret" \
   user__automq="automq-secret";
listener.name.controller_sasl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="_automq" \
   password="automq-secret" \
   user__automq="automq-secret";
By starting the server with the above configuration, we get an AutoMQ server with SASL authentication.
SASL Configuration for Client
The client configuration file content is as follows:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="_automq" password="automq-secret";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
Start AutoMQ with Configuration File
For instructions on how to quickly deploy and start a single-node AutoMQ instance, please refer to the official documentation [Direct S3 Cluster Deployment | AutoMQ]
export KAFKA_S3_ACCESS_KEY=<your-ak>
export KAFKA_S3_SECRET_KEY=<your-sk>
bin/kafka-server-start.sh /root/automq/config/kraft/server.properties
Test Connection
Create Topic:
bin/kafka-topics.sh --bootstrap-server 47.253.200.218:9092 --command-config /root/automq/bin/sasl-client.properties --create --topic test2

Created Successfully
Producer & Consumer Test:
bin/kafka-console-producer.sh --bootstrap-server 47.253.200.218:9092 --topic test2 --producer.config /root/automq/bin/sasl-client.properties
bin/kafka-console-consumer.sh --bootstrap-server 47.253.200.218:9092 --topic test2 --consumer.config /root/automq/bin/sasl-client.properties

Message Sent and Received Successfully
Summary
To configure SASL properly, you need to have a good understanding of Kafka listeners. The name of a listener can affect other configurations, so it is important to name listeners clearly and concisely to facilitate future troubleshooting.
References
[1] AutoMQ: https://www.automq.com/zh
[2] Kafka: https://kafka.apache.org/
[3] JAAS: https://docs.oracle.com/javase/8/docs/technotes/guides/security/jaas/JAASRefGuide.html
[4] GSSAPI: https://kafka.apache.org/documentation/#security_sasl_kerberos
[5] PLAIN: https://kafka.apache.org/documentation/#security_sasl_plain
[6] SCRAM-SHA-256: https://kafka.apache.org/documentation/#security_sasl_scram
[7] SCRAM-SHA-512: https://kafka.apache.org/documentation/#security_sasl_scram
[8] OAUTHBEARER: https://kafka.apache.org/documentation/#security_sasl_oauthbearer




















.png)







