Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import java.util.Properties
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRICS, GROUP, TOPIC}
import org.apache.kafka.controller.ConfigurationValidator
import org.apache.kafka.metadata.DynamicConfigValidator
import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.coordinator.group.GroupConfigManager
import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager}
import org.apache.kafka.server.config.QuotaConfig
import org.apache.kafka.server.metrics.ClientMetricsConfigs
import org.apache.kafka.storage.internals.log.LogConfig

import scala.collection.mutable
import scala.jdk.CollectionConverters._

/**
* The validator that the controller uses for dynamic configuration changes.
Expand All @@ -44,7 +47,33 @@ import scala.collection.mutable
* in the same RPC, BROKER_LOGGER is not really a dynamic configuration in the same sense
* as the others. It is not persisted to the metadata log.
*/
class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends ConfigurationValidator {
class ControllerConfigurationValidator(kafkaConfig: KafkaConfig)
extends ConfigurationValidator with DynamicConfigValidator {
private val validConfigsByType: Map[ConfigResource.Type, util.Set[String]] = {
val topicConfigs = LogConfig.nonInternalConfigNames.asScala.toSet
val brokerConfigs = DynamicConfig.Broker.names.asScala.toSet
val clientMetricsConfigs = ClientMetricsConfigs.configDef().names.asScala.toSet
val groupConfigs = GroupConfig.configDef().names.asScala.toSet
// Quota configs can be used with different resource types, so we include them for all types
val allQuotaConfigs = QuotaConfig.scramMechanismsPlusUserAndClientQuotaConfigs().names.asScala ++
QuotaConfig.userAndClientQuotaConfigs().names.asScala ++
QuotaConfig.ipConfigs.names.asScala

Map(
ConfigResource.Type.TOPIC -> (topicConfigs ++ allQuotaConfigs).asJava,
ConfigResource.Type.BROKER -> (brokerConfigs ++ allQuotaConfigs).asJava,
ConfigResource.Type.CLIENT_METRICS -> (clientMetricsConfigs ++ allQuotaConfigs).asJava,
ConfigResource.Type.GROUP -> (groupConfigs ++ allQuotaConfigs).asJava
)
}

override def isValidConfig(resourceType: ConfigResource.Type, configName: String): Boolean = {
validConfigsByType.get(resourceType) match {
case Some(configs) => configs.contains(configName)
case None => false
}
}

private def validateTopicName(
name: String
): Unit = {
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ class ControllerServer(
setCreateTopicPolicy(createTopicPolicy.toJava).
setAlterConfigPolicy(alterConfigPolicy.toJava).
setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)).
setConfigValidator(sharedServer.dynamicConfigValidator).
setStaticConfig(config.originals).
setBootstrapMetadata(bootstrapMetadata).
setFatalFaultHandler(sharedServer.fatalQuorumControllerFaultHandler).
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import org.apache.kafka.image.loader.MetadataLoader
import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics
import org.apache.kafka.image.publisher.metrics.SnapshotEmitterMetrics
import org.apache.kafka.image.publisher.{SnapshotEmitter, SnapshotGenerator}
import org.apache.kafka.metadata.ListenerInfo
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.{DynamicConfigValidator, ListenerInfo, MetadataRecordSerde}
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble
import org.apache.kafka.raft.Endpoints
import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
Expand Down Expand Up @@ -112,6 +111,7 @@ class SharedServer(
private var usedByController: Boolean = false
val brokerConfig = new KafkaConfig(sharedServerConfig.props, false)
val controllerConfig = new KafkaConfig(sharedServerConfig.props, false)
val dynamicConfigValidator: DynamicConfigValidator = new ControllerConfigurationValidator(brokerConfig)

// Factory for creating request handler pools with shared aggregate thread counter
val requestHandlerPoolFactory = new KafkaRequestHandlerPoolFactory()
Expand Down Expand Up @@ -323,7 +323,8 @@ class SharedServer(
setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
setFaultHandler(metadataLoaderFaultHandler).
setHighWaterMarkAccessor(() => _raftManager.client.highWatermark()).
setMetrics(metadataLoaderMetrics)
setMetrics(metadataLoaderMetrics).
setConfigValidator(dynamicConfigValidator)
loader = loaderBuilder.build()
snapshotEmitter = new SnapshotEmitter.Builder().
setNodeId(nodeId).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.DynamicConfigValidator;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
Expand Down Expand Up @@ -76,6 +77,7 @@ public class ConfigurationControlManager {
private final Map<String, Object> staticConfig;
private final ConfigResource currentController;
private final FeatureControlManager featureControl;
private final DynamicConfigValidator dynamicConfigValidator;

static class Builder {
private LogContext logContext = null;
Expand All @@ -87,6 +89,7 @@ static class Builder {
private Map<String, Object> staticConfig = Map.of();
private int nodeId = 0;
private FeatureControlManager featureControl = null;
private DynamicConfigValidator dynamicConfigValidator = null;

Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
Expand Down Expand Up @@ -133,6 +136,11 @@ Builder setFeatureControl(FeatureControlManager featureControl) {
return this;
}

Builder setConfigValidator(DynamicConfigValidator dynamicConfigValidator) {
this.dynamicConfigValidator = dynamicConfigValidator;
return this;
}

ConfigurationControlManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
Expand All @@ -151,7 +159,8 @@ ConfigurationControlManager build() {
validator,
staticConfig,
nodeId,
featureControl);
featureControl,
dynamicConfigValidator);
}
}

Expand All @@ -163,7 +172,8 @@ private ConfigurationControlManager(LogContext logContext,
ConfigurationValidator validator,
Map<String, Object> staticConfig,
int nodeId,
FeatureControlManager featureControl
FeatureControlManager featureControl,
DynamicConfigValidator dynamicConfigValidator
) {
this.log = logContext.logger(ConfigurationControlManager.class);
this.snapshotRegistry = snapshotRegistry;
Expand All @@ -176,6 +186,7 @@ private ConfigurationControlManager(LogContext logContext,
this.staticConfig = Map.copyOf(staticConfig);
this.currentController = new ConfigResource(Type.BROKER, Integer.toString(nodeId));
this.featureControl = featureControl;
this.dynamicConfigValidator = dynamicConfigValidator;
}

SnapshotRegistry snapshotRegistry() {
Expand Down Expand Up @@ -327,6 +338,13 @@ private ApiError validateAlterConfig(
Map<String, String> alteredConfigsForAlterConfigPolicyCheck = new HashMap<>();
TimelineHashMap<String, String> existingConfigsSnapshot = configData.get(configResource);
if (existingConfigsSnapshot != null) {
if (dynamicConfigValidator != null) {
for (String name : existingConfigsSnapshot.keySet()) {
if (!dynamicConfigValidator.isValidConfig(configResource.type(), name)) {
existingConfigsSnapshot.remove(name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we're ignoring any invalid existing config by removing it from the existingConfigsMap prior to processing it in validator.validate(configResource, allConfigs, existingConfigsMap);

I'm thinking it might work out to just incorporate this logic directly into ControllerConfigurationValidator#isValidConfig

Copy link
Contributor Author

@0xffff-zhiyan 0xffff-zhiyan Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can. But ControllerConfigurationValidator is initialized in ControllerServer.

setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)).

How can we pass it through from SharedServer all the way down to ConfigurationDelta?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that isValidConfig() and validate() are methods in the same implementation, you can now have validate() check for isValidConfig() before verifying existingConfigs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or, there might be an argument for why we should just avoid throwing errors on existingConfigs when validating alter config requests to begin with. (ControllerConfigurationValidator#validate is only called when validating alter config requests - we can simply skip checking existingConfigs in that method altogether if folks agree on this)

}
}
}
allConfigs.putAll(existingConfigsSnapshot);
existingConfigsMap.putAll(existingConfigsSnapshot);
}
Expand Down Expand Up @@ -509,6 +527,12 @@ private List<String> getParts(String value, String key, ConfigResource configRes
public void replay(ConfigRecord record) {
Type type = Type.forId(record.resourceType());
ConfigResource configResource = new ConfigResource(type, record.resourceName());

// Filter out invalid configurations using DynamicConfigValidator
if (dynamicConfigValidator != null && !dynamicConfigValidator.isValidConfig(configResource.type(), record.name())) {
return;
}

TimelineHashMap<String, String> configs = configData.get(configResource);
if (configs == null) {
configs = new TimelineHashMap<>(snapshotRegistry, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.kafka.deferred.DeferredEventQueue;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.DynamicConfigValidator;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.VersionRange;
Expand Down Expand Up @@ -209,6 +210,7 @@ public static class Builder {
private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
private Optional<AlterConfigPolicy> alterConfigPolicy = Optional.empty();
private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP;
private DynamicConfigValidator dynamicConfigValidator = null;
private Map<String, Object> staticConfig = Map.of();
private BootstrapMetadata bootstrapMetadata = null;
private int maxRecordsPerBatch = DEFAULT_MAX_RECORDS_PER_BATCH;
Expand Down Expand Up @@ -345,6 +347,11 @@ public Builder setConfigurationValidator(ConfigurationValidator configurationVal
return this;
}

public Builder setConfigValidator(DynamicConfigValidator dynamicConfigValidator) {
this.dynamicConfigValidator = dynamicConfigValidator;
return this;
}

public Builder setStaticConfig(Map<String, Object> staticConfig) {
this.staticConfig = staticConfig;
return this;
Expand Down Expand Up @@ -436,6 +443,7 @@ public QuorumController build() throws Exception {
createTopicPolicy,
alterConfigPolicy,
configurationValidator,
dynamicConfigValidator,
staticConfig,
bootstrapMetadata,
maxRecordsPerBatch,
Expand Down Expand Up @@ -1486,6 +1494,7 @@ private QuorumController(
Optional<CreateTopicPolicy> createTopicPolicy,
Optional<AlterConfigPolicy> alterConfigPolicy,
ConfigurationValidator configurationValidator,
DynamicConfigValidator dynamicConfigValidator,
Map<String, Object> staticConfig,
BootstrapMetadata bootstrapMetadata,
int maxRecordsPerBatch,
Expand Down Expand Up @@ -1548,6 +1557,7 @@ private QuorumController(
setStaticConfig(staticConfig).
setNodeId(nodeId).
setFeatureControl(featureControl).
setConfigValidator(dynamicConfigValidator).
build();
this.producerIdControlManager = new ProducerIdControlManager.Builder().
setLogContext(logContext).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kafka.image;

import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.metadata.DynamicConfigValidator;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -31,9 +32,11 @@
public final class ConfigurationDelta {
private final ConfigurationImage image;
private final Map<String, Optional<String>> changes = new HashMap<>();
private final DynamicConfigValidator dynamicConfigValidator;

public ConfigurationDelta(ConfigurationImage image) {
public ConfigurationDelta(ConfigurationImage image, DynamicConfigValidator dynamicConfigValidator) {
this.image = image;
this.dynamicConfigValidator = dynamicConfigValidator;
}

public void finishSnapshot() {
Expand All @@ -45,6 +48,9 @@ public void finishSnapshot() {
}

public void replay(ConfigRecord record) {
if (dynamicConfigValidator != null && !dynamicConfigValidator.isValidConfig(image.resource().type(), record.name())) {
return;
}
changes.put(record.name(), Optional.ofNullable(record.value()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.metadata.DynamicConfigValidator;
import org.apache.kafka.server.common.MetadataVersion;

import java.util.HashMap;
Expand All @@ -34,9 +35,15 @@
public final class ConfigurationsDelta {
private final ConfigurationsImage image;
private final Map<ConfigResource, ConfigurationDelta> changes = new HashMap<>();
private final DynamicConfigValidator dynamicConfigValidator;

public ConfigurationsDelta(ConfigurationsImage image) {
this(image, null);
}

public ConfigurationsDelta(ConfigurationsImage image, DynamicConfigValidator dynamicConfigValidator) {
this.image = image;
this.dynamicConfigValidator = dynamicConfigValidator;
}

public Map<ConfigResource, ConfigurationDelta> changes() {
Expand All @@ -48,7 +55,7 @@ public void finishSnapshot() {
ConfigResource resource = entry.getKey();
ConfigurationImage configImage = entry.getValue();
ConfigurationDelta configDelta = changes.computeIfAbsent(resource,
__ -> new ConfigurationDelta(configImage));
__ -> new ConfigurationDelta(configImage, dynamicConfigValidator));
configDelta.finishSnapshot();
}
}
Expand All @@ -63,7 +70,7 @@ public void replay(ConfigRecord record) {
ConfigurationImage configImage = image.resourceData().getOrDefault(resource,
new ConfigurationImage(resource, Map.of()));
ConfigurationDelta delta = changes.computeIfAbsent(resource,
__ -> new ConfigurationDelta(configImage));
__ -> new ConfigurationDelta(configImage, dynamicConfigValidator));
delta.replay(record);
}

Expand All @@ -73,7 +80,7 @@ public void replay(RemoveTopicRecord record, String topicName) {
if (image.resourceData().containsKey(resource)) {
ConfigurationImage configImage = image.resourceData().get(resource);
ConfigurationDelta delta = changes.computeIfAbsent(resource,
__ -> new ConfigurationDelta(configImage));
__ -> new ConfigurationDelta(configImage, dynamicConfigValidator));
delta.deleteAll();
}
}
Expand Down
18 changes: 16 additions & 2 deletions metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.metadata.DynamicConfigValidator;
import org.apache.kafka.server.common.MetadataVersion;

import java.util.Optional;
Expand All @@ -51,19 +52,27 @@
public final class MetadataDelta {
public static class Builder {
private MetadataImage image = MetadataImage.EMPTY;
private DynamicConfigValidator dynamicConfigValidator = null;

public Builder setImage(MetadataImage image) {
this.image = image;
return this;
}

public Builder setConfigValidator(DynamicConfigValidator dynamicConfigValidator) {
this.dynamicConfigValidator = dynamicConfigValidator;
return this;
}

public MetadataDelta build() {
return new MetadataDelta(image);
return new MetadataDelta(image, dynamicConfigValidator);
}
}

private final MetadataImage image;

private final DynamicConfigValidator dynamicConfigValidator;

private FeaturesDelta featuresDelta = null;

private ClusterDelta clusterDelta = null;
Expand All @@ -83,7 +92,12 @@ public MetadataDelta build() {
private DelegationTokenDelta delegationTokenDelta = null;

public MetadataDelta(MetadataImage image) {
this(image, null);
}

public MetadataDelta(MetadataImage image, DynamicConfigValidator dynamicConfigValidator) {
this.image = image;
this.dynamicConfigValidator = dynamicConfigValidator;
}

public MetadataImage image() {
Expand Down Expand Up @@ -122,7 +136,7 @@ public ConfigurationsDelta configsDelta() {
}

public ConfigurationsDelta getOrCreateConfigsDelta() {
if (configsDelta == null) configsDelta = new ConfigurationsDelta(image.configs());
if (configsDelta == null) configsDelta = new ConfigurationsDelta(image.configs(), dynamicConfigValidator);
return configsDelta;
}

Expand Down
Loading
Loading