diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala index f163a2739aea1..2256425f1047b 100644 --- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala +++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala @@ -22,13 +22,16 @@ import java.util.Properties import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRICS, GROUP, TOPIC} import org.apache.kafka.controller.ConfigurationValidator +import org.apache.kafka.metadata.DynamicConfigValidator import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException} import org.apache.kafka.common.internals.Topic -import org.apache.kafka.coordinator.group.GroupConfigManager +import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager} +import org.apache.kafka.server.config.QuotaConfig import org.apache.kafka.server.metrics.ClientMetricsConfigs import org.apache.kafka.storage.internals.log.LogConfig import scala.collection.mutable +import scala.jdk.CollectionConverters._ /** * The validator that the controller uses for dynamic configuration changes. @@ -44,7 +47,33 @@ import scala.collection.mutable * in the same RPC, BROKER_LOGGER is not really a dynamic configuration in the same sense * as the others. It is not persisted to the metadata log. */ -class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends ConfigurationValidator { +class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) + extends ConfigurationValidator with DynamicConfigValidator { + private val validConfigsByType: Map[ConfigResource.Type, util.Set[String]] = { + val topicConfigs = LogConfig.nonInternalConfigNames.asScala.toSet + val brokerConfigs = DynamicConfig.Broker.names.asScala.toSet + val clientMetricsConfigs = ClientMetricsConfigs.configDef().names.asScala.toSet + val groupConfigs = GroupConfig.configDef().names.asScala.toSet + // Quota configs can be used with different resource types, so we include them for all types + val allQuotaConfigs = QuotaConfig.scramMechanismsPlusUserAndClientQuotaConfigs().names.asScala ++ + QuotaConfig.userAndClientQuotaConfigs().names.asScala ++ + QuotaConfig.ipConfigs.names.asScala + + Map( + ConfigResource.Type.TOPIC -> (topicConfigs ++ allQuotaConfigs).asJava, + ConfigResource.Type.BROKER -> (brokerConfigs ++ allQuotaConfigs).asJava, + ConfigResource.Type.CLIENT_METRICS -> (clientMetricsConfigs ++ allQuotaConfigs).asJava, + ConfigResource.Type.GROUP -> (groupConfigs ++ allQuotaConfigs).asJava + ) + } + + override def isValidConfig(resourceType: ConfigResource.Type, configName: String): Boolean = { + validConfigsByType.get(resourceType) match { + case Some(configs) => configs.contains(configName) + case None => false + } + } + private def validateTopicName( name: String ): Unit = { diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 81af28dc4958d..65f2ab110db88 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -246,6 +246,7 @@ class ControllerServer( setCreateTopicPolicy(createTopicPolicy.toJava). setAlterConfigPolicy(alterConfigPolicy.toJava). setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)). + setConfigValidator(sharedServer.dynamicConfigValidator). setStaticConfig(config.originals). setBootstrapMetadata(bootstrapMetadata). setFatalFaultHandler(sharedServer.fatalQuorumControllerFaultHandler). diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index 3acfc9bf0b9f9..e82a7ba58228f 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -30,8 +30,7 @@ import org.apache.kafka.image.loader.MetadataLoader import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics import org.apache.kafka.image.publisher.metrics.SnapshotEmitterMetrics import org.apache.kafka.image.publisher.{SnapshotEmitter, SnapshotGenerator} -import org.apache.kafka.metadata.ListenerInfo -import org.apache.kafka.metadata.MetadataRecordSerde +import org.apache.kafka.metadata.{DynamicConfigValidator, ListenerInfo, MetadataRecordSerde} import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble import org.apache.kafka.raft.Endpoints import org.apache.kafka.server.{ProcessRole, ServerSocketFactory} @@ -112,6 +111,7 @@ class SharedServer( private var usedByController: Boolean = false val brokerConfig = new KafkaConfig(sharedServerConfig.props, false) val controllerConfig = new KafkaConfig(sharedServerConfig.props, false) + val dynamicConfigValidator: DynamicConfigValidator = new ControllerConfigurationValidator(brokerConfig) // Factory for creating request handler pools with shared aggregate thread counter val requestHandlerPoolFactory = new KafkaRequestHandlerPoolFactory() @@ -323,7 +323,8 @@ class SharedServer( setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-"). setFaultHandler(metadataLoaderFaultHandler). setHighWaterMarkAccessor(() => _raftManager.client.highWatermark()). - setMetrics(metadataLoaderMetrics) + setMetrics(metadataLoaderMetrics). + setConfigValidator(dynamicConfigValidator) loader = loaderBuilder.build() snapshotEmitter = new SnapshotEmitter.Builder(). setNodeId(nodeId). diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index 4a0219d175949..af0345acfe05f 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.metadata.DynamicConfigValidator; import org.apache.kafka.metadata.KafkaConfigSchema; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; @@ -76,6 +77,7 @@ public class ConfigurationControlManager { private final Map staticConfig; private final ConfigResource currentController; private final FeatureControlManager featureControl; + private final DynamicConfigValidator dynamicConfigValidator; static class Builder { private LogContext logContext = null; @@ -87,6 +89,7 @@ static class Builder { private Map staticConfig = Map.of(); private int nodeId = 0; private FeatureControlManager featureControl = null; + private DynamicConfigValidator dynamicConfigValidator = null; Builder setLogContext(LogContext logContext) { this.logContext = logContext; @@ -133,6 +136,11 @@ Builder setFeatureControl(FeatureControlManager featureControl) { return this; } + Builder setConfigValidator(DynamicConfigValidator dynamicConfigValidator) { + this.dynamicConfigValidator = dynamicConfigValidator; + return this; + } + ConfigurationControlManager build() { if (logContext == null) logContext = new LogContext(); if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); @@ -151,7 +159,8 @@ ConfigurationControlManager build() { validator, staticConfig, nodeId, - featureControl); + featureControl, + dynamicConfigValidator); } } @@ -163,7 +172,8 @@ private ConfigurationControlManager(LogContext logContext, ConfigurationValidator validator, Map staticConfig, int nodeId, - FeatureControlManager featureControl + FeatureControlManager featureControl, + DynamicConfigValidator dynamicConfigValidator ) { this.log = logContext.logger(ConfigurationControlManager.class); this.snapshotRegistry = snapshotRegistry; @@ -176,6 +186,7 @@ private ConfigurationControlManager(LogContext logContext, this.staticConfig = Map.copyOf(staticConfig); this.currentController = new ConfigResource(Type.BROKER, Integer.toString(nodeId)); this.featureControl = featureControl; + this.dynamicConfigValidator = dynamicConfigValidator; } SnapshotRegistry snapshotRegistry() { @@ -327,6 +338,13 @@ private ApiError validateAlterConfig( Map alteredConfigsForAlterConfigPolicyCheck = new HashMap<>(); TimelineHashMap existingConfigsSnapshot = configData.get(configResource); if (existingConfigsSnapshot != null) { + if (dynamicConfigValidator != null) { + for (String name : existingConfigsSnapshot.keySet()) { + if (!dynamicConfigValidator.isValidConfig(configResource.type(), name)) { + existingConfigsSnapshot.remove(name); + } + } + } allConfigs.putAll(existingConfigsSnapshot); existingConfigsMap.putAll(existingConfigsSnapshot); } @@ -509,6 +527,12 @@ private List getParts(String value, String key, ConfigResource configRes public void replay(ConfigRecord record) { Type type = Type.forId(record.resourceType()); ConfigResource configResource = new ConfigResource(type, record.resourceName()); + + // Filter out invalid configurations using DynamicConfigValidator + if (dynamicConfigValidator != null && !dynamicConfigValidator.isValidConfig(configResource.type(), record.name())) { + return; + } + TimelineHashMap configs = configData.get(configResource); if (configs == null) { configs = new TimelineHashMap<>(snapshotRegistry, 0); diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index fc6e889ba3e68..71787073e606e 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -98,6 +98,7 @@ import org.apache.kafka.deferred.DeferredEventQueue; import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerRegistrationReply; +import org.apache.kafka.metadata.DynamicConfigValidator; import org.apache.kafka.metadata.FinalizedControllerFeatures; import org.apache.kafka.metadata.KafkaConfigSchema; import org.apache.kafka.metadata.VersionRange; @@ -209,6 +210,7 @@ public static class Builder { private Optional createTopicPolicy = Optional.empty(); private Optional alterConfigPolicy = Optional.empty(); private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP; + private DynamicConfigValidator dynamicConfigValidator = null; private Map staticConfig = Map.of(); private BootstrapMetadata bootstrapMetadata = null; private int maxRecordsPerBatch = DEFAULT_MAX_RECORDS_PER_BATCH; @@ -345,6 +347,11 @@ public Builder setConfigurationValidator(ConfigurationValidator configurationVal return this; } + public Builder setConfigValidator(DynamicConfigValidator dynamicConfigValidator) { + this.dynamicConfigValidator = dynamicConfigValidator; + return this; + } + public Builder setStaticConfig(Map staticConfig) { this.staticConfig = staticConfig; return this; @@ -436,6 +443,7 @@ public QuorumController build() throws Exception { createTopicPolicy, alterConfigPolicy, configurationValidator, + dynamicConfigValidator, staticConfig, bootstrapMetadata, maxRecordsPerBatch, @@ -1486,6 +1494,7 @@ private QuorumController( Optional createTopicPolicy, Optional alterConfigPolicy, ConfigurationValidator configurationValidator, + DynamicConfigValidator dynamicConfigValidator, Map staticConfig, BootstrapMetadata bootstrapMetadata, int maxRecordsPerBatch, @@ -1548,6 +1557,7 @@ private QuorumController( setStaticConfig(staticConfig). setNodeId(nodeId). setFeatureControl(featureControl). + setConfigValidator(dynamicConfigValidator). build(); this.producerIdControlManager = new ProducerIdControlManager.Builder(). setLogContext(logContext). diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java index dc550d8c72aa1..a6a2f8c780553 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java @@ -18,6 +18,7 @@ package org.apache.kafka.image; import org.apache.kafka.common.metadata.ConfigRecord; +import org.apache.kafka.metadata.DynamicConfigValidator; import java.util.HashMap; import java.util.Map; @@ -31,9 +32,11 @@ public final class ConfigurationDelta { private final ConfigurationImage image; private final Map> changes = new HashMap<>(); + private final DynamicConfigValidator dynamicConfigValidator; - public ConfigurationDelta(ConfigurationImage image) { + public ConfigurationDelta(ConfigurationImage image, DynamicConfigValidator dynamicConfigValidator) { this.image = image; + this.dynamicConfigValidator = dynamicConfigValidator; } public void finishSnapshot() { @@ -45,6 +48,9 @@ public void finishSnapshot() { } public void replay(ConfigRecord record) { + if (dynamicConfigValidator != null && !dynamicConfigValidator.isValidConfig(image.resource().type(), record.name())) { + return; + } changes.put(record.name(), Optional.ofNullable(record.value())); } diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java index 0b3fcbb386722..4b3875de5a27e 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.config.ConfigResource.Type; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; +import org.apache.kafka.metadata.DynamicConfigValidator; import org.apache.kafka.server.common.MetadataVersion; import java.util.HashMap; @@ -34,9 +35,15 @@ public final class ConfigurationsDelta { private final ConfigurationsImage image; private final Map changes = new HashMap<>(); + private final DynamicConfigValidator dynamicConfigValidator; public ConfigurationsDelta(ConfigurationsImage image) { + this(image, null); + } + + public ConfigurationsDelta(ConfigurationsImage image, DynamicConfigValidator dynamicConfigValidator) { this.image = image; + this.dynamicConfigValidator = dynamicConfigValidator; } public Map changes() { @@ -48,7 +55,7 @@ public void finishSnapshot() { ConfigResource resource = entry.getKey(); ConfigurationImage configImage = entry.getValue(); ConfigurationDelta configDelta = changes.computeIfAbsent(resource, - __ -> new ConfigurationDelta(configImage)); + __ -> new ConfigurationDelta(configImage, dynamicConfigValidator)); configDelta.finishSnapshot(); } } @@ -63,7 +70,7 @@ public void replay(ConfigRecord record) { ConfigurationImage configImage = image.resourceData().getOrDefault(resource, new ConfigurationImage(resource, Map.of())); ConfigurationDelta delta = changes.computeIfAbsent(resource, - __ -> new ConfigurationDelta(configImage)); + __ -> new ConfigurationDelta(configImage, dynamicConfigValidator)); delta.replay(record); } @@ -73,7 +80,7 @@ public void replay(RemoveTopicRecord record, String topicName) { if (image.resourceData().containsKey(resource)) { ConfigurationImage configImage = image.resourceData().get(resource); ConfigurationDelta delta = changes.computeIfAbsent(resource, - __ -> new ConfigurationDelta(configImage)); + __ -> new ConfigurationDelta(configImage, dynamicConfigValidator)); delta.deleteAll(); } } diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java index b934d10f6d10d..02127a1af2297 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java @@ -40,6 +40,7 @@ import org.apache.kafka.common.metadata.UnregisterBrokerRecord; import org.apache.kafka.common.metadata.UserScramCredentialRecord; import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.metadata.DynamicConfigValidator; import org.apache.kafka.server.common.MetadataVersion; import java.util.Optional; @@ -51,19 +52,27 @@ public final class MetadataDelta { public static class Builder { private MetadataImage image = MetadataImage.EMPTY; + private DynamicConfigValidator dynamicConfigValidator = null; public Builder setImage(MetadataImage image) { this.image = image; return this; } + public Builder setConfigValidator(DynamicConfigValidator dynamicConfigValidator) { + this.dynamicConfigValidator = dynamicConfigValidator; + return this; + } + public MetadataDelta build() { - return new MetadataDelta(image); + return new MetadataDelta(image, dynamicConfigValidator); } } private final MetadataImage image; + private final DynamicConfigValidator dynamicConfigValidator; + private FeaturesDelta featuresDelta = null; private ClusterDelta clusterDelta = null; @@ -83,7 +92,12 @@ public MetadataDelta build() { private DelegationTokenDelta delegationTokenDelta = null; public MetadataDelta(MetadataImage image) { + this(image, null); + } + + public MetadataDelta(MetadataImage image, DynamicConfigValidator dynamicConfigValidator) { this.image = image; + this.dynamicConfigValidator = dynamicConfigValidator; } public MetadataImage image() { @@ -122,7 +136,7 @@ public ConfigurationsDelta configsDelta() { } public ConfigurationsDelta getOrCreateConfigsDelta() { - if (configsDelta == null) configsDelta = new ConfigurationsDelta(image.configs()); + if (configsDelta == null) configsDelta = new ConfigurationsDelta(image.configs(), dynamicConfigValidator); return configsDelta; } diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java index f0b7b004b15a5..089318e470b43 100644 --- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java +++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java @@ -28,6 +28,7 @@ import org.apache.kafka.image.publisher.MetadataPublisher; import org.apache.kafka.image.writer.ImageReWriter; import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.metadata.DynamicConfigValidator; import org.apache.kafka.queue.EventQueue; import org.apache.kafka.queue.KafkaEventQueue; import org.apache.kafka.raft.Batch; @@ -81,6 +82,7 @@ public static class Builder { private FaultHandler faultHandler = FaultHandlerException::new; private MetadataLoaderMetrics metrics = null; private Supplier highWaterMarkAccessor = null; + private DynamicConfigValidator dynamicConfigValidator = null; public Builder setNodeId(int nodeId) { this.nodeId = nodeId; @@ -112,6 +114,11 @@ public Builder setMetrics(MetadataLoaderMetrics metrics) { return this; } + public Builder setConfigValidator(DynamicConfigValidator dynamicConfigValidator) { + this.dynamicConfigValidator = dynamicConfigValidator; + return this; + } + public MetadataLoader build() { if (logContext == null) { logContext = new LogContext("[MetadataLoader id=" + nodeId + "] "); @@ -132,7 +139,8 @@ public MetadataLoader build() { threadNamePrefix, faultHandler, metrics, - highWaterMarkAccessor); + highWaterMarkAccessor, + dynamicConfigValidator); } } @@ -196,19 +204,26 @@ public MetadataLoader build() { */ private final KafkaEventQueue eventQueue; + /** + * Config validator for filtering invalid configurations. + */ + private final DynamicConfigValidator dynamicConfigValidator; + private MetadataLoader( Time time, LogContext logContext, String threadNamePrefix, FaultHandler faultHandler, MetadataLoaderMetrics metrics, - Supplier highWaterMarkAccessor + Supplier highWaterMarkAccessor, + DynamicConfigValidator dynamicConfigValidator ) { this.log = logContext.logger(MetadataLoader.class); this.time = time; this.faultHandler = faultHandler; this.metrics = metrics; this.highWaterMarkAccessor = highWaterMarkAccessor; + this.dynamicConfigValidator = dynamicConfigValidator; this.uninitializedPublishers = new LinkedHashMap<>(); this.publishers = new LinkedHashMap<>(); this.image = MetadataImage.EMPTY; @@ -296,6 +311,7 @@ void initializeNewPublishers() { // haven't seen anything previously. MetadataDelta delta = new MetadataDelta.Builder(). setImage(MetadataImage.EMPTY). + setConfigValidator(dynamicConfigValidator). build(); ImageReWriter writer = new ImageReWriter(delta); image.write(writer, new ImageWriterOptions.Builder(image.features().metadataVersionOrThrow()). diff --git a/metadata/src/main/java/org/apache/kafka/metadata/DynamicConfigValidator.java b/metadata/src/main/java/org/apache/kafka/metadata/DynamicConfigValidator.java new file mode 100644 index 0000000000000..4849ace6032bc --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/DynamicConfigValidator.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import org.apache.kafka.common.config.ConfigResource; + +/** + * Validator interface for checking if a configuration name is valid for a given resource type. + */ +@FunctionalInterface +public interface DynamicConfigValidator { + /** + * Check if a configuration name is valid for the given resource type. + * + * @param resourceType the type of resource (broker, topic, user, etc.) + * @param configName the name of the configuration + * @return true if the configuration is valid for the resource type, false otherwise + */ + boolean isValidConfig(ConfigResource.Type resourceType, String configName); +} diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java index 2c93d1100ecae..ea6cb9e892e8e 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.metadata.DynamicConfigValidator; import org.apache.kafka.metadata.KafkaConfigSchema; import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -561,4 +562,63 @@ private FeatureControlManager createFeatureControlManager() { setFeatureLevel(MetadataVersion.LATEST_PRODUCTION.featureLevel())); return featureControlManager; } + + @Test + public void testValidateAlterConfigFiltersInvalidExistingConfigs() { + Set validConfigs = Set.of("abc", "def"); + DynamicConfigValidator dynamicConfigValidator = (resourceType, configName) -> validConfigs.contains(configName); + + ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). + setFeatureControl(createFeatureControlManager()). + setKafkaConfigSchema(SCHEMA). + setConfigValidator(dynamicConfigValidator). + build(); + + manager.replay(new ConfigRecord(). + setResourceType(TOPIC.id()).setResourceName("mytopic"). + setName("abc").setValue("value1")); // valid + manager.replay(new ConfigRecord(). + setResourceType(TOPIC.id()).setResourceName("mytopic"). + setName("invalid.config").setValue("should-be-filtered")); // invalid + + Map configs = manager.getConfigs(MYTOPIC); + assertTrue(configs.containsKey("abc"), "Valid config should be in configData"); + assertFalse(configs.containsKey("invalid.config"), "Invalid config should be filtered out from configData"); + + ControllerResult result = manager.incrementalAlterConfig( + MYTOPIC, + toMap(entry("def", entry(SET, "newValue"))), + false); + + assertEquals(ApiError.NONE, result.response()); + } + + @Test + public void testReplayFiltersInvalidConfigs() { + Set validConfigs = Set.of("abc", "def", "ghi"); + DynamicConfigValidator dynamicConfigValidator = (resourceType, configName) -> validConfigs.contains(configName); + + ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). + setKafkaConfigSchema(SCHEMA). + setConfigValidator(dynamicConfigValidator). + build(); + + // Replay valid configs + manager.replay(new ConfigRecord(). + setResourceType(TOPIC.id()).setResourceName("mytopic"). + setName("abc").setValue("value1")); + manager.replay(new ConfigRecord(). + setResourceType(TOPIC.id()).setResourceName("mytopic"). + setName("def").setValue("value2")); + + manager.replay(new ConfigRecord(). + setResourceType(TOPIC.id()).setResourceName("mytopic"). + setName("invalid.config").setValue("should-be-filtered")); + + Map configs = manager.getConfigs(MYTOPIC); + assertEquals(2, configs.size(), "Should only have valid configs"); + assertTrue(configs.containsKey("abc")); + assertTrue(configs.containsKey("def")); + assertFalse(configs.containsKey("invalid.config"), "Invalid config should not be in configData"); + } } diff --git a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java index 6300e1293e826..a446e58219c7c 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.image.writer.RecordListWriter; +import org.apache.kafka.metadata.DynamicConfigValidator; import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -31,10 +32,13 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import static org.apache.kafka.common.config.ConfigResource.Type.BROKER; import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(value = 40) @@ -116,6 +120,74 @@ public void testImage2RoundTrip() { testToImage(IMAGE2); } + @Test + public void testConfigurationDeltaFiltering() { + Set validConfigs = Set.of("foo", "bar"); + DynamicConfigValidator dynamicConfigValidator = (resourceType, configName) -> validConfigs.contains(configName); + + Map initialConfigs = Map.of("foo", "value1"); + ConfigurationImage image = new ConfigurationImage(new ConfigResource(BROKER, "0"), initialConfigs); + + ConfigurationDelta delta = new ConfigurationDelta(image, dynamicConfigValidator); + // "bar" is valid, should be added + delta.replay(new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0") + .setName("bar").setValue("value2")); + // "qux" is invalid, should be filtered out in replay() + delta.replay(new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0") + .setName("qux").setValue("value3")); + + ConfigurationImage result = delta.apply(); + + // "foo" is valid and in initial configs, should be present + assertTrue(result.data().containsKey("foo")); + // "bar" is valid and was added via replay, should be present + assertTrue(result.data().containsKey("bar")); + // "qux" is invalid and was filtered out in replay(), should not be present + assertFalse(result.data().containsKey("qux")); + } + + @Test + public void testConfigurationDeltaWithoutFiltering() { + Map initialConfigs = Map.of("foo", "value1", "bar", "value2"); + ConfigurationImage image = new ConfigurationImage(new ConfigResource(BROKER, "0"), initialConfigs); + + ConfigurationDelta delta = new ConfigurationDelta(image, null); + delta.replay(new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0") + .setName("baz").setValue("value3")); + + ConfigurationImage result = delta.apply(); + + assertTrue(result.data().containsKey("foo")); + assertTrue(result.data().containsKey("bar")); + assertTrue(result.data().containsKey("baz")); + } + + @Test + public void testConfigurationDeltaFiltersInvalidConfigsFromBaseImage() { + Set validConfigs = Set.of("foo", "bar"); + DynamicConfigValidator dynamicConfigValidator = (resourceType, configName) -> validConfigs.contains(configName); + + Map initialConfigs = Map.of( + "foo", "value1", // valid + "bar", "value2", // valid + "invalid", "value3" // invalid + ); + ConfigurationImage image = new ConfigurationImage(new ConfigResource(BROKER, "0"), initialConfigs); + ConfigurationDelta delta = new ConfigurationDelta(image, dynamicConfigValidator); + + delta.replay(new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0") + .setName("foo").setValue("value1")); + delta.replay(new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0") + .setName("bar").setValue("value2")); + delta.finishSnapshot(); + + ConfigurationImage result = delta.apply(); + + assertTrue(result.data().containsKey("foo")); + assertTrue(result.data().containsKey("bar")); + assertFalse(result.data().containsKey("invalid")); + } + private static void testToImage(ConfigurationsImage image) { testToImage(image, Optional.empty()); } diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java index 7af9557381dad..1759524706569 100644 --- a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java +++ b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java @@ -17,11 +17,14 @@ package org.apache.kafka.shell; +import kafka.server.ControllerConfigurationValidator; +import kafka.server.KafkaConfig; import kafka.tools.TerseFailure; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.image.loader.MetadataLoader; +import org.apache.kafka.metadata.DynamicConfigValidator; import org.apache.kafka.metadata.util.SnapshotFileReader; import org.apache.kafka.server.fault.FaultHandler; import org.apache.kafka.server.fault.LoggingFaultHandler; @@ -47,6 +50,7 @@ import java.nio.file.Path; import java.util.List; import java.util.Optional; +import java.util.Properties; import java.util.concurrent.TimeUnit; /** @@ -148,10 +152,13 @@ public MetadataShell( private void initializeWithSnapshotFileReader() throws Exception { this.fileLock = takeDirectoryLockIfExists(parentParent(new File(snapshotPath))); + DynamicConfigValidator dynamicConfigValidator = new ControllerConfigurationValidator(new KafkaConfig(new Properties(), false)); + this.loader = new MetadataLoader.Builder(). setFaultHandler(faultHandler). setNodeId(-1). setHighWaterMarkAccessor(() -> snapshotFileReader.highWaterMark()). + setConfigValidator(dynamicConfigValidator). build(); snapshotFileReader = new SnapshotFileReader(snapshotPath, loader); snapshotFileReader.startup();