Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18225: ClientQuotaCallback#updateClusterMetadata is unsupported by kraft #18196

Merged
merged 56 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
5fe6d1a
wip
m1a2st Dec 15, 2024
b3edea5
add clusterId to DynamicClientQuotaPublisher
m1a2st Dec 15, 2024
15d7dbb
Draft a version
m1a2st Dec 15, 2024
4123c70
revert test
m1a2st Dec 16, 2024
765d7f1
Merge branch 'trunk' into KAFKA-18225
m1a2st Dec 16, 2024
513b381
add new test
m1a2st Dec 16, 2024
74b1006
update the test
m1a2st Dec 17, 2024
d1cfe97
Merge branch 'trunk' into KAFKA-18225
m1a2st Dec 18, 2024
a4758e7
Merge branch 'trunk' into KAFKA-18225
m1a2st Dec 31, 2024
dee9a65
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 8, 2025
cd853b6
resolve conflict
m1a2st Jan 8, 2025
48a8b41
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 9, 2025
6bf41a7
addressed by comments
m1a2st Jan 9, 2025
3fe1d7c
addressed by comments
m1a2st Jan 11, 2025
ed50ea8
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 11, 2025
f87db91
fix var name
m1a2st Jan 12, 2025
57fad78
update delete topic
m1a2st Jan 13, 2025
f468e55
addressed by comment
m1a2st Jan 13, 2025
0c136b7
addressed by comment
m1a2st Jan 13, 2025
ac9f55b
addressed by comment
m1a2st Jan 13, 2025
909d8f9
addressed by comment
m1a2st Jan 13, 2025
5e88ee1
update when updateClusterMetadata success
m1a2st Jan 13, 2025
b3365f0
addressed by comment
m1a2st Jan 15, 2025
8e36e2f
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 16, 2025
977a603
update the doc
m1a2st Jan 16, 2025
de16d0e
addressed the comment
m1a2st Jan 16, 2025
9f04283
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 18, 2025
830c99c
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 19, 2025
3a3e409
fix the test
m1a2st Jan 19, 2025
e48ca88
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 20, 2025
6f679f1
fix the conflict
m1a2st Jan 20, 2025
be432e6
separator logic
m1a2st Jan 20, 2025
af4cee5
add license
m1a2st Jan 20, 2025
39f542e
revert unused change
m1a2st Jan 20, 2025
0bab1c7
revert unused change
m1a2st Jan 20, 2025
1ea0067
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 21, 2025
929e87a
wip
m1a2st Jan 24, 2025
101cbdc
addressed by comments
m1a2st Jan 24, 2025
9c28db2
add new test
m1a2st Jan 24, 2025
9514964
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 26, 2025
498ab89
resolve the conflict
m1a2st Jan 26, 2025
c5fb945
fix the conflict
m1a2st Jan 26, 2025
c986339
fix the conflict
m1a2st Jan 26, 2025
9721c48
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 30, 2025
294cb93
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 31, 2025
3877941
Merge branch 'trunk' into KAFKA-18225
m1a2st Feb 2, 2025
2b527b4
Merge branch 'trunk' into KAFKA-18225
m1a2st Feb 3, 2025
3dc5afb
addressed by comment
m1a2st Feb 3, 2025
9469bff
Merge branch 'trunk' into KAFKA-18225
m1a2st Feb 4, 2025
71123fb
addressed by comment
m1a2st Feb 4, 2025
d902055
Merge branch 'trunk' into KAFKA-18225
m1a2st Feb 5, 2025
3673d78
resolve the conflict
m1a2st Feb 5, 2025
21cf98e
Merge branch 'trunk' into KAFKA-18225
m1a2st Feb 5, 2025
9451d69
update the test
m1a2st Feb 5, 2025
ee23d92
address all comments
m1a2st Feb 6, 2025
7924c03
update new line
m1a2st Feb 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.Map;

/**
* Quota callback interface for brokers that enables customization of client quota computation.
* Quota callback interface for brokers and controllers that enables customization of client quota computation.
*/
public interface ClientQuotaCallback extends Configurable {

Expand Down Expand Up @@ -89,11 +89,11 @@ public interface ClientQuotaCallback extends Configurable {
boolean quotaResetRequired(ClientQuotaType quotaType);

/**
* Metadata update callback that is invoked whenever UpdateMetadata request is received from
* the controller. This is useful if quota computation takes partitions into account.
* Metadata update callback that is invoked whenever the topic and cluster delta changed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we want to mention cluster/topic deltas here, this is a public API and part of our javadoc. I think we can say something like: This callback is invoked whenever the cluster metadata changes. This includes brokers added or removed, topics created or deleted, and partition leadership changes.

* This is useful if quota computation takes partitions into account.
* Topics that are being deleted will not be included in `cluster`.
*
* @param cluster Cluster metadata including partitions and their leaders if known
* @param cluster Cluster metadata including topic and cluster
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand this change. I prefer the previous message.

* @return true if quotas have changed and metric configs may need to be updated
*/
boolean updateClusterMetadata(Cluster cluster);
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,15 @@ class BrokerServer(
config,
sharedServer.metadataPublishingFaultHandler,
"broker",
clientQuotaMetadataManager),
clientQuotaMetadataManager,
),
new DynamicTopicClusterQuotaPublisher(
clusterId,
config,
sharedServer.metadataPublishingFaultHandler,
"broker",
quotaManagers,
),
new ScramPublisher(
config,
sharedServer.metadataPublishingFaultHandler,
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
import kafka.server.QuotaFactory.QuotaManagers

import scala.collection.immutable
import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.network.ListenerName
Expand Down Expand Up @@ -332,7 +332,17 @@ class ControllerServer(
config,
sharedServer.metadataPublishingFaultHandler,
"controller",
clientQuotaMetadataManager))
clientQuotaMetadataManager
))

// Set up the DynamicTopicClusterQuotaPublisher. This will enable quotas for the cluster and topics.
metadataPublishers.add(new DynamicTopicClusterQuotaPublisher(
clusterId,
config,
sharedServer.metadataPublishingFaultHandler,
"controller",
quotaManagers,
))

// Set up the SCRAM publisher.
metadataPublishers.add(new ScramPublisher(
Expand Down
100 changes: 98 additions & 2 deletions core/src/main/scala/kafka/server/MetadataCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@ package kafka.server

import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData, DescribeTopicPartitionsResponseData, DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData, MetadataResponseData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
import org.apache.kafka.image.MetadataImage
import org.apache.kafka.metadata.{BrokerRegistration, LeaderAndIsr, PartitionRegistration}
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion}

import java.util
import java.util.Collections
import java.util.concurrent.ThreadLocalRandom
import java.util.function.Supplier
import scala.collection._
import scala.jdk.CollectionConverters.CollectionHasAsScala

trait MetadataCache extends ConfigRepository {
/**
Expand Down Expand Up @@ -144,4 +149,95 @@ object MetadataCache {
): KRaftMetadataCache = {
new KRaftMetadataCache(brokerId, kraftVersionSupplier)
}

def toCluster(clusterId: String, image: MetadataImage): Cluster = {
val brokerToNodes = new util.HashMap[Integer, java.util.List[Node]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can drop the java. prefix in java.util.List

image.cluster().brokers()
.values().stream()
.filter(broker => !broker.fenced())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> .filter(!_.fenced()) but maybe you wrote it this way with the future conversion to Java in mind. In that case we can keep the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I want to keep this because we need to convert it to a Java future.

.forEach { broker => brokerToNodes.put(broker.id(), broker.nodes()) }

def getNodes(id: Int): java.util.List[Node] = brokerToNodes.get(id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again drop the java. prefix


val partitionInfos = new util.ArrayList[PartitionInfo]
val internalTopics = new util.HashSet[String]

def toArray(replicas: Array[Int]): Array[Node] = {
util.Arrays.stream(replicas)
.mapToObj(replica => getNodes(replica))
.flatMap(replica => replica.stream()).toArray(size => new Array[Node](size))
}

val topicImages = image.topics().topicsByName().values()
if (topicImages != null) {
topicImages.forEach { topic =>
topic.partitions().forEach { (key, value) =>
val partitionId = key
val partition = value
val nodes = getNodes(partition.leader)
if (nodes != null) {
nodes.forEach(node => {
partitionInfos.add(new PartitionInfo(topic.name(),
partitionId,
node,
toArray(partition.replicas),
toArray(partition.isr),
getOfflineReplicas(image, partition).stream()
.map(replica => getNodes(replica))
.flatMap(replica => replica.stream()).toArray(size => new Array[Node](size))))
})
if (Topic.isInternal(topic.name())) {
internalTopics.add(topic.name())
}
}
}
}
}

val controllerNode = getNodes(getRandomAliveBroker(image).getOrElse(-1)) match {
case null => Node.noNode()
case nodes => nodes.get(0)
}
// Note: the constructor of Cluster does not allow us to reference unregistered nodes.
// So, for example, if partition foo-0 has replicas [1, 2] but broker 2 is not
// registered, we pass its replicas as [1, -1]. This doesn't make a lot of sense, but
// we are duplicating the behavior of ZkMetadataCache, for now.
new Cluster(clusterId, brokerToNodes.values().stream().flatMap(n => n.stream()).collect(util.stream.Collectors.toList()),
partitionInfos, Collections.emptySet(), internalTopics, controllerNode)
}

private def getOfflineReplicas(image: MetadataImage,
partition: PartitionRegistration,
listenerName: ListenerName = null): util.List[Integer] = {
val offlineReplicas = new util.ArrayList[Integer](0)
for (brokerId <- partition.replicas) {
Option(image.cluster().broker(brokerId)) match {
case None => offlineReplicas.add(brokerId)
case Some(broker) => if (listenerName == null || isReplicaOffline(partition, listenerName, broker)) {
offlineReplicas.add(brokerId)
}
}
}
offlineReplicas
}

private def isReplicaOffline(partition: PartitionRegistration, listenerName: ListenerName, broker: BrokerRegistration) =
broker.fenced() || !broker.listeners().containsKey(listenerName.value()) || isReplicaInOfflineDir(broker, partition)

private def isReplicaInOfflineDir(broker: BrokerRegistration, partition: PartitionRegistration): Boolean =
!broker.hasOnlineDir(partition.directory(broker.id()))

private def getRandomAliveBroker(image: MetadataImage): Option[Int] = {
val aliveBrokers = getAliveBrokers(image).toList
if (aliveBrokers.isEmpty) {
None
} else {
Some(aliveBrokers(ThreadLocalRandom.current().nextInt(aliveBrokers.size)).id)
}
}

private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata] = {
image.cluster().brokers().values().asScala.filterNot(_.fenced()).
map(b => new BrokerMetadata(b.id, b.rack))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class BrokerMetadataPublisher(
shareCoordinator: Option[ShareCoordinator],
var dynamicConfigPublisher: DynamicConfigPublisher,
dynamicClientQuotaPublisher: DynamicClientQuotaPublisher,
dynamicTopicClusterQuotaPublisher: DynamicTopicClusterQuotaPublisher,
scramPublisher: ScramPublisher,
delegationTokenPublisher: DelegationTokenPublisher,
aclPublisher: AclPublisher,
Expand Down Expand Up @@ -199,6 +200,9 @@ class BrokerMetadataPublisher(
// Apply client quotas delta.
dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage)

// Apply topic or cluster quotas delta.
dynamicTopicClusterQuotaPublisher.onMetadataUpdate(delta, newImage)

// Apply SCRAM delta.
scramPublisher.onMetadataUpdate(delta, newImage)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package kafka.server.metadata

import kafka.server.{KafkaConfig, MetadataCache}
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Logging
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.server.fault.FaultHandler

/**
* Publishing dynamic topic or cluster changes to the client quota manager.
* Temporary solution since Cluster objects are immutable and costly to update for every metadata change.
* See KAFKA-18239 to trace the issue.
*/
class DynamicTopicClusterQuotaPublisher (
clusterId: String,
conf: KafkaConfig,
faultHandler: FaultHandler,
nodeType: String,
quotaManagers: QuotaManagers
) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
logIdent = s"[${name()}] "

override def name(): String = s"DynamicTopicClusterQuotaPublisher $nodeType id=${conf.nodeId}"

override def onMetadataUpdate(
delta: MetadataDelta,
newImage: MetadataImage,
manifest: LoaderManifest
): Unit = {
onMetadataUpdate(delta, newImage)
}

def onMetadataUpdate(
delta: MetadataDelta,
newImage: MetadataImage,
): Unit = {
val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we only compute this val in the catch block?

try {
quotaManagers.clientQuotaCallback().ifPresent(clientQuotaCallback => {
if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
val cluster = MetadataCache.toCluster(clusterId, newImage)
if (clientQuotaCallback.updateClusterMetadata(cluster)) {
quotaManagers.fetch.updateQuotaMetricConfigs()
quotaManagers.produce.updateQuotaMetricConfigs()
quotaManagers.request.updateQuotaMetricConfigs()
quotaManagers.controllerMutation.updateQuotaMetricConfigs()
}
}
})
} catch {
case t: Throwable => faultHandler.handleFault("Uncaught exception while " +
s"publishing dynamic topic or cluster changes from $deltaName", t)
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: missing new line

Loading
Loading