Skip to content

Commit f5cce7e

Browse files
jsancioabhi-ksolves
authored andcommitted
MINOR; Fix incompatible change to the kafka config (apache#16464)
Prior to KIP-853, users were not allow to enumerate listeners specified in `controller.listener.names` in the `advertised.listeners`. This decision was made in 3.3 because the `controller.quorum.voters` property is in effect the list of advertised listeners for all of the controllers. KIP-853 is moving away from `controller.quorum.voters` in favor of a dynamic set of voters. This means that the user needs to have a way of specifying the advertised listeners for controller. This change allows the users to specify listener names in `controller.listener.names` in `advertised.listeners`. To make this change forwards compatible (use a valid configuration from 3.8 in 3.9), the controller's advertised listeners are going to get computed by looking up the endpoint in `advertised.listeners`. If it doesn't exist, the controller will look up the endpoint in the `listeners` configuration. This change also includes a fix the to the BeginQuorumEpoch request where the default value for VoterId was 0 instead of -1. Reviewers: Colin P. McCabe <[email protected]>
1 parent 247bbfd commit f5cce7e

File tree

11 files changed

+107
-42
lines changed

11 files changed

+107
-42
lines changed

clients/src/main/resources/common/message/BeginQuorumEpochRequest.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
"fields": [
2525
{ "name": "ClusterId", "type": "string", "versions": "0+",
2626
"nullableVersions": "0+", "default": "null"},
27-
{ "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId", "ignorable": true,
28-
"about": "The voter ID of the receiving replica" },
27+
{ "name": "VoterId", "type": "int32", "versions": "1+", "ignorable": true, "default": "-1", "entityType": "brokerId",
28+
"about": "The replica id of the voter receiving the request" },
2929
{ "name": "Topics", "type": "[]TopicData",
3030
"versions": "0+", "fields": [
3131
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",

core/src/main/scala/kafka/server/BrokerServer.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ class BrokerServer(
255255
clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
256256

257257
val listenerInfo = ListenerInfo.create(Optional.of(config.interBrokerListenerName.value()),
258-
config.effectiveAdvertisedListeners.map(_.toJava).asJava).
258+
config.effectiveAdvertisedBrokerListeners.map(_.toJava).asJava).
259259
withWildcardHostnamesResolved().
260260
withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name)))
261261

core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

+5-5
Original file line numberDiff line numberDiff line change
@@ -1069,7 +1069,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
10691069
def validateReconfiguration(newConfig: KafkaConfig): Unit = {
10701070
val oldConfig = server.config
10711071
val newListeners = listenersToMap(newConfig.listeners)
1072-
val newAdvertisedListeners = listenersToMap(newConfig.effectiveAdvertisedListeners)
1072+
val newAdvertisedListeners = listenersToMap(newConfig.effectiveAdvertisedBrokerListeners)
10731073
val oldListeners = listenersToMap(oldConfig.listeners)
10741074
if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
10751075
throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'")
@@ -1093,8 +1093,8 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
10931093

10941094
// Currently, we do not support adding or removing listeners when in KRaft mode.
10951095
// However, we support changing other listener configurations (max connections, etc.)
1096-
if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedListeners),
1097-
listenersToMap(newConfig.effectiveAdvertisedListeners))) {
1096+
if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners),
1097+
listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) {
10981098
verifyListenerRegistrationAlterationSupported()
10991099
}
11001100
}
@@ -1111,8 +1111,8 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
11111111
if (listenersRemoved.nonEmpty) server.socketServer.removeListeners(listenersRemoved)
11121112
if (listenersAdded.nonEmpty) server.socketServer.addListeners(listenersAdded)
11131113
}
1114-
if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedListeners),
1115-
listenersToMap(newConfig.effectiveAdvertisedListeners))) {
1114+
if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners),
1115+
listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) {
11161116
verifyListenerRegistrationAlterationSupported()
11171117
server match {
11181118
case kafkaServer: KafkaServer => kafkaServer.kafkaController.updateBrokerInfo(kafkaServer.createBrokerInfo)

core/src/main/scala/kafka/server/KafkaConfig.scala

+24-14
Original file line numberDiff line numberDiff line change
@@ -789,11 +789,17 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
789789
}
790790

791791
def effectiveAdvertisedControllerListeners: Seq[EndPoint] = {
792-
// Only expose controller listeners
793-
advertisedListeners.filter(l => controllerListenerNames.contains(l.listenerName.value()))
792+
val controllerAdvertisedListeners = advertisedListeners.filter(l => controllerListenerNames.contains(l.listenerName.value()))
793+
val controllerListenersValue = controllerListeners
794+
795+
controllerListenerNames.flatMap { name =>
796+
controllerAdvertisedListeners
797+
.find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name)))
798+
.orElse(controllerListenersValue.find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name))))
799+
}
794800
}
795801

796-
def effectiveAdvertisedListeners: Seq[EndPoint] = {
802+
def effectiveAdvertisedBrokerListeners: Seq[EndPoint] = {
797803
// Only expose broker listeners
798804
advertisedListeners.filterNot(l => controllerListenerNames.contains(l.listenerName.value()))
799805
}
@@ -919,7 +925,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
919925
" to prevent frequent changes in ISR")
920926
require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor,
921927
"offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor")
922-
val advertisedListenerNames = effectiveAdvertisedListeners.map(_.listenerName).toSet
928+
val advertisedBrokerListenerNames = effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet
923929

924930
// validate KRaft-related configs
925931
val voterIds = QuorumConfig.parseVoterIds(quorumVoters)
@@ -938,7 +944,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
938944
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} is not supported in KRaft mode.")
939945
}
940946
def validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker(): Unit = {
941-
require(advertisedListenerNames.forall(aln => !controllerListenerNames.contains(aln.value())),
947+
require(advertisedBrokerListenerNames.forall(aln => !controllerListenerNames.contains(aln.value())),
942948
s"The advertised.listeners config must not contain KRaft controller listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the broker role because Kafka clients that send requests via advertised listeners do not send requests to KRaft controllers -- they only send requests to KRaft brokers.")
943949
}
944950
def validateControllerQuorumVotersMustContainNodeIdForKRaftController(): Unit = {
@@ -955,7 +961,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
955961
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must only contain values appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration when running the KRaft controller role")
956962
}
957963
def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
958-
require(advertisedListenerNames.nonEmpty,
964+
require(advertisedBrokerListenerNames.nonEmpty,
959965
"There must be at least one advertised listener." + (
960966
if (processRoles.contains(ProcessRole.BrokerRole)) s" Perhaps all listeners appear in ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG}?" else ""))
961967
}
@@ -992,7 +998,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
992998
validateControlPlaneListenerEmptyForKRaft()
993999
// listeners should only contain listeners also enumerated in the controller listener
9941000
require(
995-
effectiveAdvertisedListeners.isEmpty,
1001+
effectiveAdvertisedControllerListeners.size == listeners.size,
9961002
s"The ${SocketServerConfigs.LISTENERS_CONFIG} config must only contain KRaft controller listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when ${KRaftConfigs.PROCESS_ROLES_CONFIG}=controller"
9971003
)
9981004
validateControllerQuorumVotersMustContainNodeIdForKRaftController()
@@ -1032,25 +1038,29 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
10321038
if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) {
10331039
// validations for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located)
10341040
validateAdvertisedListenersNonEmptyForBroker()
1035-
require(advertisedListenerNames.contains(interBrokerListenerName),
1041+
require(advertisedBrokerListenerNames.contains(interBrokerListenerName),
10361042
s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +
1037-
s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}")
1038-
require(advertisedListenerNames.subsetOf(listenerNames),
1043+
s"The valid options based on currently configured listeners are ${advertisedBrokerListenerNames.map(_.value).mkString(",")}")
1044+
require(advertisedBrokerListenerNames.subsetOf(listenerNames),
10391045
s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} listener names must be equal to or a subset of the ones defined in ${SocketServerConfigs.LISTENERS_CONFIG}. " +
1040-
s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " +
1046+
s"Found ${advertisedBrokerListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " +
10411047
s"are ${listenerNames.map(_.value).mkString(",")}"
10421048
)
10431049
}
10441050

1045-
require(!effectiveAdvertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
1051+
require(!effectiveAdvertisedBrokerListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
1052+
s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} cannot use the nonroutable meta-address 0.0.0.0. "+
1053+
s"Use a routable IP address.")
1054+
1055+
require(!effectiveAdvertisedControllerListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
10461056
s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} cannot use the nonroutable meta-address 0.0.0.0. "+
10471057
s"Use a routable IP address.")
10481058

10491059
// validate control.plane.listener.name config
10501060
if (controlPlaneListenerName.isDefined) {
1051-
require(advertisedListenerNames.contains(controlPlaneListenerName.get),
1061+
require(advertisedBrokerListenerNames.contains(controlPlaneListenerName.get),
10521062
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +
1053-
s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}")
1063+
s"The valid options based on currently configured listeners are ${advertisedBrokerListenerNames.map(_.value).mkString(",")}")
10541064
// controlPlaneListenerName should be different from interBrokerListenerName
10551065
require(!controlPlaneListenerName.get.value().equals(interBrokerListenerName.value()),
10561066
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG}, when defined, should have a different value from the inter broker listener name. " +

core/src/main/scala/kafka/server/KafkaServer.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ class KafkaServer(
462462
raftManager.startup()
463463

464464
val networkListeners = new ListenerCollection()
465-
config.effectiveAdvertisedListeners.foreach { ep =>
465+
config.effectiveAdvertisedBrokerListeners.foreach { ep =>
466466
networkListeners.add(new Listener().
467467
setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host).
468468
setName(ep.listenerName.value()).
@@ -752,14 +752,14 @@ class KafkaServer(
752752
}
753753

754754
def createBrokerInfo: BrokerInfo = {
755-
val endPoints = config.effectiveAdvertisedListeners.map(e => s"${e.host}:${e.port}")
755+
val endPoints = config.effectiveAdvertisedBrokerListeners.map(e => s"${e.host}:${e.port}")
756756
zkClient.getAllBrokersInCluster.filter(_.id != config.brokerId).foreach { broker =>
757757
val commonEndPoints = broker.endPoints.map(e => s"${e.host}:${e.port}").intersect(endPoints)
758758
require(commonEndPoints.isEmpty, s"Configured end points ${commonEndPoints.mkString(",")} in" +
759759
s" advertised listeners are already registered by broker ${broker.id}")
760760
}
761761

762-
val listeners = config.effectiveAdvertisedListeners.map { endpoint =>
762+
val listeners = config.effectiveAdvertisedBrokerListeners.map { endpoint =>
763763
if (endpoint.port == 0)
764764
endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
765765
else
@@ -1107,7 +1107,7 @@ class KafkaServer(
11071107

11081108
/** Return advertised listeners with the bound port (this may differ from the configured port if the latter is `0`). */
11091109
def advertisedListeners: Seq[EndPoint] = {
1110-
config.effectiveAdvertisedListeners.map { endPoint =>
1110+
config.effectiveAdvertisedBrokerListeners.map { endPoint =>
11111111
endPoint.copy(port = boundPort(endPoint.listenerName))
11121112
}
11131113
}

core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -1515,7 +1515,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
15151515
private def alterAdvertisedListener(adminClient: Admin, externalAdminClient: Admin, oldHost: String, newHost: String): Unit = {
15161516
val configs = servers.map { server =>
15171517
val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
1518-
val newListeners = server.config.effectiveAdvertisedListeners.map { e =>
1518+
val newListeners = server.config.effectiveAdvertisedBrokerListeners.map { e =>
15191519
if (e.listenerName.value == SecureExternal)
15201520
s"${e.listenerName.value}://$newHost:${server.boundPort(e.listenerName)}"
15211521
else
@@ -1527,7 +1527,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
15271527
adminClient.alterConfigs(configs).all.get
15281528
servers.foreach { server =>
15291529
TestUtils.retry(10000) {
1530-
val externalListener = server.config.effectiveAdvertisedListeners.find(_.listenerName.value == SecureExternal)
1530+
val externalListener = server.config.effectiveAdvertisedBrokerListeners.find(_.listenerName.value == SecureExternal)
15311531
.getOrElse(throw new IllegalStateException("External listener not found"))
15321532
assertEquals(newHost, externalListener.host, "Config not updated")
15331533
}

core/src/test/scala/unit/kafka/network/SocketServerTest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ class SocketServerTest {
367367
val config = KafkaConfig.fromProps(testProps)
368368
val testableServer = new TestableSocketServer(config)
369369

370-
val updatedEndPoints = config.effectiveAdvertisedListeners.map { endpoint =>
370+
val updatedEndPoints = config.effectiveAdvertisedBrokerListeners.map { endpoint =>
371371
endpoint.copy(port = testableServer.boundPort(endpoint.listenerName))
372372
}.map(_.toJava)
373373

0 commit comments

Comments
 (0)