From a0b1307e45b318904ade8a9baeeeb474cfdaaa9c Mon Sep 17 00:00:00 2001 From: kevin-wu24 <66326898+kevin-wu24@users.noreply.github.com> Date: Tue, 17 Dec 2024 10:49:58 -0600 Subject: [PATCH] make exception for when inter.broker.listener.name is inferred from security protocol and add test --- .../main/scala/kafka/server/KafkaConfig.scala | 12 +++++++----- .../unit/kafka/server/KafkaConfigTest.scala | 17 +++++++++++++++++ 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 8596164478d76..67f6e6489a81f 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -911,11 +911,13 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) effectiveAdvertisedControllerListeners.size == listeners.size, s"The ${SocketServerConfigs.LISTENERS_CONFIG} config must only contain KRaft controller listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when ${KRaftConfigs.PROCESS_ROLES_CONFIG}=controller" ) - // controller.listener.names must not contain inter.broker.listener.name - require( - !controllerListenerNames.contains(interBrokerListenerName.value()), - s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must not contain the '${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG}' configuration value when ${KRaftConfigs.PROCESS_ROLES_CONFIG}=controller'" - ) + // controller.listener.names must not contain inter.broker.listener.name when inter.broker.listener.name is explicitly set + if (Option(getString(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG)).isDefined) { + require( + !controllerListenerNames.contains(interBrokerListenerName.value()), + s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must not contain an explicitly set ${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} configuration value when ${KRaftConfigs.PROCESS_ROLES_CONFIG}=controller'" + ) + } validateControllerQuorumVotersMustContainNodeIdForKRaftController() validateAdvertisedControllerListenersNonEmptyForKRaftController() validateControllerListenerNamesMustAppearInListenersForKRaftController() diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 179a5db7ac755..25a76b861c520 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1406,6 +1406,23 @@ class KafkaConfigTest { KafkaConfig.fromProps(props) } + @Test + def testControllerListenerNamesValidForKRaftControllerOnly(): Unit = { + val props = new Properties() + props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1") + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller") + props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9092") + props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SASL_SSL://:9092,CONTROLLER://:9093") + props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "SASL_SSL:SASL_SSL,CONTROLLER:SASL_SSL") + props.put(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "SASL_SSL") + props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER,SASL_SSL") + + val expectedExceptionContainsText = + """controller.listener.names must not contain an explicitly set inter.broker.listener.name configuration value + |when process.roles=controller""".stripMargin.replaceAll("\n", " ") + assertBadConfigContainingMessage(props, expectedExceptionContainsText) + } + @Test def testControllerQuorumVoterStringsToNodes(): Unit = { assertThrows(classOf[ConfigException], () => QuorumConfig.quorumVoterStringsToNodes(Collections.singletonList("")))