Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ All notable changes to this project will be documented in this file.
### Removed

- Refactor: remove unused RBAC cluster role ([#914]).
- Refactor: remove superfluous and partly misconfigured Kafka listeners CLIENT_AUTH and CONTROLLER_AUTH ([#915]).

[#911]: https://github.com/stackabletech/kafka-operator/pull/911
[#914]: https://github.com/stackabletech/kafka-operator/pull/914
[#915]: https://github.com/stackabletech/kafka-operator/pull/915

## [25.11.0] - 2025-11-07

Expand Down
76 changes: 37 additions & 39 deletions rust/operator-binary/src/crd/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,40 @@ pub enum KafkaListenerProtocol {

#[derive(strum::Display, Debug, EnumString, Ord, Eq, PartialEq, PartialOrd)]
pub enum KafkaListenerName {
/// The purpose of this listener is to handle client/broker communications.
/// It can be configured to use the SSL or SASL_SSL (Kerberos) protocols
/// if the brokers use TLS for communication (and possible authentication).
/// The PLAINTEXT protocol is never really used since `spec.clusterConfig.tls`
/// uses `tls` as default value.
/// The advertised listener hosts are derived from the pod (broker) listener volume.
#[strum(serialize = "CLIENT")]
Client,
#[strum(serialize = "CLIENT_AUTH")]
ClientAuth,
/// The purpose if this listener is to handle broker internal communications.
/// The only protocol used here is SSL.
#[strum(serialize = "INTERNAL")]
Internal,
/// This is almost identical with the `Client` listener with the following exceptions:
///
/// - it is only defined if Kerberos is enabled
/// - it uses a different port
/// - the keystore associated with the listener volume uses a different CA
///
/// Note: the corresponding K8S service is *always* defined, not just if Kerberos is enabled
/// and it is published in the discovery ConfigMap for clients to consume.
#[strum(serialize = "BOOTSTRAP")]
Bootstrap,
/// This listener is defined when Kraft mode is enabled.
/// It is responsible for broker/controller as well as controller/controller communications
/// and therefore it is present on *both* brokers and controller properties files.
/// The only protocol used is SSL.
/// The advertised host names are FQDN pod names of the controllers.
///
/// Notes:
///
/// - there is no listener for client/controller communication
/// - this listener does not support SSL_SASL.
#[strum(serialize = "CONTROLLER")]
Controller,
#[strum(serialize = "CONTROLLER_AUTH")]
ControllerAuth,
}

impl KafkaListenerName {
Expand Down Expand Up @@ -179,28 +201,7 @@ pub fn get_kafka_listener_config(
BTreeMap::new();

// CLIENT
if kafka_security.tls_client_authentication_class().is_some() {
// 1) If client authentication required, we expose only CLIENT_AUTH connection with SSL
listeners.push(KafkaListener {
name: KafkaListenerName::ClientAuth,
host: LISTENER_LOCAL_ADDRESS.to_string(),
port: kafka_security.client_port().to_string(),
});
advertised_listeners.push(KafkaListener {
name: KafkaListenerName::ClientAuth,
host: node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
port: node_port_cmd(
STACKABLE_LISTENER_BROKER_DIR,
kafka_security.client_port_name(),
),
});
listener_security_protocol_map
.insert(KafkaListenerName::ClientAuth, KafkaListenerProtocol::Ssl);
listener_security_protocol_map.insert(
KafkaListenerName::ControllerAuth,
KafkaListenerProtocol::Ssl,
);
} else if kafka_security.has_kerberos_enabled() {
if kafka_security.has_kerberos_enabled() {
// 2) Kerberos and TLS authentication classes are mutually exclusive
listeners.push(KafkaListener {
name: KafkaListenerName::Client,
Expand All @@ -217,12 +218,10 @@ pub fn get_kafka_listener_config(
});
listener_security_protocol_map
.insert(KafkaListenerName::Client, KafkaListenerProtocol::SaslSsl);
listener_security_protocol_map.insert(
KafkaListenerName::Controller,
KafkaListenerProtocol::SaslSsl,
);
} else if kafka_security.tls_server_secret_class().is_some() {
// 3) If no client authentication but tls is required we expose CLIENT with SSL
} else if kafka_security.tls_client_authentication_class().is_some()
|| kafka_security.tls_server_secret_class().is_some()
{
// 1) Client listener uses TLS (possibly with authentication)
listeners.push(KafkaListener {
name: KafkaListenerName::Client,
host: LISTENER_LOCAL_ADDRESS.to_string(),
Expand All @@ -239,7 +238,8 @@ pub fn get_kafka_listener_config(
listener_security_protocol_map
.insert(KafkaListenerName::Client, KafkaListenerProtocol::Ssl);
} else {
// 4) If no client auth or tls is required we expose CLIENT with PLAINTEXT
// 3) If no client auth or tls is required we expose CLIENT with PLAINTEXT
// This is actually never the case because
listeners.push(KafkaListener {
name: KafkaListenerName::Client,
host: LISTENER_LOCAL_ADDRESS.to_string(),
Expand Down Expand Up @@ -414,7 +414,7 @@ mod tests {
config.listeners(),
format!(
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}",
name = KafkaListenerName::ClientAuth,
name = KafkaListenerName::Client,
host = LISTENER_LOCAL_ADDRESS,
port = kafka_security.client_port(),
internal_name = KafkaListenerName::Internal,
Expand All @@ -427,7 +427,7 @@ mod tests {
config.advertised_listeners(),
format!(
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}",
name = KafkaListenerName::ClientAuth,
name = KafkaListenerName::Client,
host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
port = node_port_cmd(
STACKABLE_LISTENER_BROKER_DIR,
Expand All @@ -447,15 +447,13 @@ mod tests {
assert_eq!(
config.listener_security_protocol_map(),
format!(
"{name}:{protocol},{internal_name}:{internal_protocol},{controller_name}:{controller_protocol},{controller_auth_name}:{controller_auth_protocol}",
name = KafkaListenerName::ClientAuth,
"{name}:{protocol},{internal_name}:{internal_protocol},{controller_name}:{controller_protocol}",
name = KafkaListenerName::Client,
protocol = KafkaListenerProtocol::Ssl,
internal_name = KafkaListenerName::Internal,
internal_protocol = KafkaListenerProtocol::Ssl,
controller_name = KafkaListenerName::Controller,
controller_protocol = KafkaListenerProtocol::Ssl,
controller_auth_name = KafkaListenerName::ControllerAuth,
controller_auth_protocol = KafkaListenerProtocol::Ssl,
)
);

Expand Down
62 changes: 20 additions & 42 deletions rust/operator-binary/src/crd/security.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,37 +531,9 @@ impl KafkaTlsSecurity {
// We set either client tls with authentication or client tls without authentication
// If authentication is explicitly required we do not want to have any other CAs to
// be trusted.
if self.tls_client_authentication_class().is_some() {
config.insert(
KafkaListenerName::ClientAuth.listener_ssl_keystore_location(),
format!("{}/keystore.p12", Self::STACKABLE_TLS_KAFKA_SERVER_DIR),
);
config.insert(
KafkaListenerName::ClientAuth.listener_ssl_keystore_password(),
Self::SSL_STORE_PASSWORD.to_string(),
);
config.insert(
KafkaListenerName::ClientAuth.listener_ssl_keystore_type(),
"PKCS12".to_string(),
);
config.insert(
KafkaListenerName::ClientAuth.listener_ssl_truststore_location(),
format!("{}/truststore.p12", Self::STACKABLE_TLS_KAFKA_SERVER_DIR),
);
config.insert(
KafkaListenerName::ClientAuth.listener_ssl_truststore_password(),
Self::SSL_STORE_PASSWORD.to_string(),
);
config.insert(
KafkaListenerName::ClientAuth.listener_ssl_truststore_type(),
"PKCS12".to_string(),
);
// client auth required
config.insert(
KafkaListenerName::ClientAuth.listener_ssl_client_auth(),
"required".to_string(),
);
} else if self.tls_server_secret_class().is_some() {
if self.tls_client_authentication_class().is_some()
|| self.tls_server_secret_class().is_some()
{
config.insert(
KafkaListenerName::Client.listener_ssl_keystore_location(),
format!("{}/keystore.p12", Self::STACKABLE_TLS_KAFKA_SERVER_DIR),
Expand All @@ -586,6 +558,13 @@ impl KafkaTlsSecurity {
KafkaListenerName::Client.listener_ssl_truststore_type(),
"PKCS12".to_string(),
);
if self.tls_client_authentication_class().is_some() {
// client auth required
config.insert(
KafkaListenerName::Client.listener_ssl_client_auth(),
"required".to_string(),
);
}
}

if self.has_kerberos_enabled() {
Expand Down Expand Up @@ -699,17 +678,6 @@ impl KafkaTlsSecurity {
pub fn controller_config_settings(&self) -> BTreeMap<String, String> {
let mut config = BTreeMap::new();

// We set either client tls with authentication or client tls without authentication
// If authentication is explicitly required we do not want to have any other CAs to
// be trusted.
if self.tls_client_authentication_class().is_some() {
// client auth required
config.insert(
KafkaListenerName::ControllerAuth.listener_ssl_client_auth(),
"required".to_string(),
);
}

if self.tls_client_authentication_class().is_some()
|| self.tls_internal_secret_class().is_some()
{
Expand Down Expand Up @@ -737,6 +705,16 @@ impl KafkaTlsSecurity {
KafkaListenerName::Controller.listener_ssl_truststore_type(),
"PKCS12".to_string(),
);
// We set either client tls with authentication or client tls without authentication
// If authentication is explicitly required we do not want to have any other CAs to
// be trusted.
if self.tls_client_authentication_class().is_some() {
// client auth required
config.insert(
KafkaListenerName::Controller.listener_ssl_client_auth(),
"required".to_string(),
);
}
}

// Kerberos
Expand Down
Loading