diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 43811be2f40db..76aa4a2b17502 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -566,6 +566,7 @@
+
@@ -583,7 +584,6 @@
-
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index f2b35d13e849c..43772802070af 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -34,7 +34,9 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
+import org.apache.kafka.connect.runtime.isolation.PluginUtils;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
@@ -64,6 +66,8 @@
import org.apache.kafka.connect.util.TemporaryStage;
import org.apache.log4j.Level;
+import org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,8 +89,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
@@ -94,9 +96,15 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_VERSION;
import static org.apache.kafka.connect.runtime.ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.HEADER_CONVERTER_VERSION_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG;
+
/**
* Abstract Herder implementation which handles connector/task lifecycle tracking. Extensions
@@ -138,7 +146,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
private final Time time;
protected final Loggers loggers;
- private final ConcurrentMap tempConnectors = new ConcurrentHashMap<>();
+ private final CachedConnectors cachedConnectors;
public AbstractHerder(Worker worker,
String workerId,
@@ -157,6 +165,7 @@ public AbstractHerder(Worker worker,
this.connectorExecutor = Executors.newCachedThreadPool();
this.time = time;
this.loggers = new Loggers(time);
+ this.cachedConnectors = new CachedConnectors(worker.getPlugins());
}
@Override
@@ -398,6 +407,8 @@ protected Map validateSourceConnectorConfig(SourceConnector
* may be null, in which case no validation will be performed under the assumption that the
* connector will use inherit the converter settings from the worker. Some errors encountered
* during validation may be {@link ConfigValue#addErrorMessage(String) added} to this object
+ * @param pluginVersionValue the {@link ConfigValue} for the converter version property in the connector config;
+ *
* @param pluginInterface the interface for the plugin type
* (e.g., {@code org.apache.kafka.connect.storage.Converter.class});
* may not be null
@@ -418,14 +429,18 @@ protected Map validateSourceConnectorConfig(SourceConnector
* @param the plugin class to perform validation for
*/
+ @SuppressWarnings("unchecked")
private ConfigInfos validateConverterConfig(
Map connectorConfig,
ConfigValue pluginConfigValue,
+ ConfigValue pluginVersionValue,
Class pluginInterface,
Function configDefAccessor,
String pluginName,
String pluginProperty,
+ String pluginVersionProperty,
Map defaultProperties,
+ ClassLoader connectorLoader,
Function reportStage
) {
Objects.requireNonNull(connectorConfig);
@@ -433,12 +448,15 @@ private ConfigInfos validateConverterConfig(
Objects.requireNonNull(configDefAccessor);
Objects.requireNonNull(pluginName);
Objects.requireNonNull(pluginProperty);
+ Objects.requireNonNull(pluginVersionProperty);
String pluginClass = connectorConfig.get(pluginProperty);
+ String pluginVersion = connectorConfig.get(pluginVersionProperty);
if (pluginClass == null
|| pluginConfigValue == null
|| !pluginConfigValue.errorMessages().isEmpty()
+ || !pluginVersionValue.errorMessages().isEmpty()
) {
// Either no custom converter was specified, or one was specified but there's a problem with it.
// No need to proceed any further.
@@ -448,11 +466,22 @@ private ConfigInfos validateConverterConfig(
T pluginInstance;
String stageDescription = "instantiating the connector's " + pluginName + " for validation";
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
- pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
+ VersionRange range = PluginUtils.connectorVersionRequirement(pluginVersion);
+ pluginInstance = (T) plugins().newPlugin(pluginClass, range, connectorLoader);
+ } catch (VersionedPluginLoadingException e) {
+ log.error("Failed to load {} class {} with version {}: {}", pluginName, pluginClass, pluginVersion, e);
+ pluginConfigValue.addErrorMessage(e.getMessage());
+ pluginVersionValue.addErrorMessage(e.getMessage());
+ return null;
} catch (ClassNotFoundException | RuntimeException e) {
log.error("Failed to instantiate {} class {}; this should have been caught by prior validation logic", pluginName, pluginClass, e);
pluginConfigValue.addErrorMessage("Failed to load class " + pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
return null;
+ } catch (InvalidVersionSpecificationException e) {
+ // this should have been caught by prior validation logic
+ log.error("Invalid version range for {} class {}: {}", pluginName, pluginClass, pluginVersion, e);
+ pluginVersionValue.addErrorMessage(e.getMessage());
+ return null;
}
try {
@@ -494,55 +523,55 @@ private ConfigInfos validateConverterConfig(
}
}
- private ConfigInfos validateHeaderConverterConfig(
- Map connectorConfig,
- ConfigValue headerConverterConfigValue,
+ private ConfigInfos validateAllConverterConfigs(
+ Map connectorProps,
+ Map validatedConnectorConfig,
+ ClassLoader connectorLoader,
Function reportStage
) {
- return validateConverterConfig(
- connectorConfig,
- headerConverterConfigValue,
+ String connType = connectorProps.get(CONNECTOR_CLASS_CONFIG);
+ // do custom converter-specific validation
+ ConfigInfos headerConverterConfigInfos = validateConverterConfig(
+ connectorProps,
+ validatedConnectorConfig.get(HEADER_CONVERTER_CLASS_CONFIG),
+ validatedConnectorConfig.get(HEADER_CONVERTER_VERSION_CONFIG),
HeaderConverter.class,
HeaderConverter::config,
"header converter",
HEADER_CONVERTER_CLASS_CONFIG,
+ HEADER_CONVERTER_VERSION_CONFIG,
Collections.singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName()),
+ connectorLoader,
reportStage
);
- }
-
- private ConfigInfos validateKeyConverterConfig(
- Map connectorConfig,
- ConfigValue keyConverterConfigValue,
- Function reportStage
- ) {
- return validateConverterConfig(
- connectorConfig,
- keyConverterConfigValue,
+ ConfigInfos keyConverterConfigInfos = validateConverterConfig(
+ connectorProps,
+ validatedConnectorConfig.get(KEY_CONVERTER_CLASS_CONFIG),
+ validatedConnectorConfig.get(KEY_CONVERTER_VERSION_CONFIG),
Converter.class,
Converter::config,
"key converter",
KEY_CONVERTER_CLASS_CONFIG,
+ KEY_CONVERTER_VERSION_CONFIG,
Collections.singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.KEY.getName()),
+ connectorLoader,
reportStage
);
- }
- private ConfigInfos validateValueConverterConfig(
- Map connectorConfig,
- ConfigValue valueConverterConfigValue,
- Function reportStage
- ) {
- return validateConverterConfig(
- connectorConfig,
- valueConverterConfigValue,
+ ConfigInfos valueConverterConfigInfos = validateConverterConfig(
+ connectorProps,
+ validatedConnectorConfig.get(VALUE_CONVERTER_CLASS_CONFIG),
+ validatedConnectorConfig.get(VALUE_CONVERTER_VERSION_CONFIG),
Converter.class,
Converter::config,
"value converter",
VALUE_CONVERTER_CLASS_CONFIG,
+ VALUE_CONVERTER_VERSION_CONFIG,
Collections.singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName()),
+ connectorLoader,
reportStage
);
+ return mergeConfigInfos(connType, headerConverterConfigInfos, keyConverterConfigInfos, valueConverterConfigInfos);
}
@Override
@@ -634,6 +663,146 @@ protected boolean connectorUsesProducer(org.apache.kafka.connect.health.Connecto
|| SinkConnectorConfig.hasDlqTopicConfig(connProps);
}
+ private ConfigInfos validateClientOverrides(
+ Map connectorProps,
+ org.apache.kafka.connect.health.ConnectorType connectorType,
+ Class extends Connector> connectorClass,
+ Function reportStage,
+ boolean doLog
+ ) {
+ if (connectorClass == null || connectorType == null) {
+ return null;
+ }
+ AbstractConfig connectorConfig = new AbstractConfig(new ConfigDef(), connectorProps, doLog);
+ String connName = connectorProps.get(ConnectorConfig.NAME_CONFIG);
+ String connType = connectorProps.get(CONNECTOR_CLASS_CONFIG);
+ ConfigInfos producerConfigInfos = null;
+ ConfigInfos consumerConfigInfos = null;
+ ConfigInfos adminConfigInfos = null;
+ String stageDescription = null;
+
+ if (connectorUsesProducer(connectorType, connectorProps)) {
+ stageDescription = "validating producer config overrides for the connector";
+ try (TemporaryStage stage = reportStage.apply(stageDescription)) {
+ producerConfigInfos = validateClientOverrides(
+ connName,
+ ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX,
+ connectorConfig,
+ ProducerConfig.configDef(),
+ connectorClass,
+ connectorType,
+ ConnectorClientConfigRequest.ClientType.PRODUCER,
+ connectorClientConfigOverridePolicy);
+ }
+ }
+ if (connectorUsesAdmin(connectorType, connectorProps)) {
+ stageDescription = "validating admin config overrides for the connector";
+ try (TemporaryStage stage = reportStage.apply(stageDescription)) {
+ adminConfigInfos = validateClientOverrides(
+ connName,
+ ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX,
+ connectorConfig,
+ AdminClientConfig.configDef(),
+ connectorClass,
+ connectorType,
+ ConnectorClientConfigRequest.ClientType.ADMIN,
+ connectorClientConfigOverridePolicy);
+ }
+ }
+ if (connectorUsesConsumer(connectorType, connectorProps)) {
+ stageDescription = "validating consumer config overrides for the connector";
+ try (TemporaryStage stage = reportStage.apply(stageDescription)) {
+ consumerConfigInfos = validateClientOverrides(
+ connName,
+ ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
+ connectorConfig,
+ ConsumerConfig.configDef(),
+ connectorClass,
+ connectorType,
+ ConnectorClientConfigRequest.ClientType.CONSUMER,
+ connectorClientConfigOverridePolicy);
+ }
+ }
+ return mergeConfigInfos(connType,
+ producerConfigInfos,
+ consumerConfigInfos,
+ adminConfigInfos
+ );
+ }
+
+ private ConfigInfos validateConnectorPluginSpecifiedConfigs(
+ Map connectorProps,
+ Map validatedConnectorConfig,
+ ConfigDef enrichedConfigDef,
+ Connector connector,
+ Function reportStage
+ ) {
+ List configValues = new ArrayList<>(validatedConnectorConfig.values());
+ Map configKeys = new LinkedHashMap<>(enrichedConfigDef.configKeys());
+ Set allGroups = new LinkedHashSet<>(enrichedConfigDef.groups());
+
+ String connType = connectorProps.get(CONNECTOR_CLASS_CONFIG);
+ // do custom connector-specific validation
+ ConfigDef configDef;
+ String stageDescription = "retrieving the configuration definition from the connector";
+ try (TemporaryStage stage = reportStage.apply(stageDescription)) {
+ configDef = connector.config();
+ }
+ if (null == configDef) {
+ throw new BadRequestException(
+ String.format(
+ "%s.config() must return a ConfigDef that is not null.",
+ connector.getClass().getName()
+ )
+ );
+ }
+
+ Config config;
+ stageDescription = "performing multi-property validation for the connector";
+ try (TemporaryStage stage = reportStage.apply(stageDescription)) {
+ config = connector.validate(connectorProps);
+ }
+ if (null == config) {
+ throw new BadRequestException(
+ String.format(
+ "%s.validate() must return a Config that is not null.",
+ connector.getClass().getName()
+ )
+ );
+ }
+ configKeys.putAll(configDef.configKeys());
+ allGroups.addAll(configDef.groups());
+ configValues.addAll(config.configValues());
+ return generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));
+ }
+
+ private void addNullValuedErrors(Map connectorProps, Map validatedConfig) {
+ connectorProps.entrySet().stream()
+ .filter(e -> e.getValue() == null)
+ .map(Map.Entry::getKey)
+ .forEach(prop ->
+ validatedConfig.computeIfAbsent(prop, ConfigValue::new)
+ .addErrorMessage("Null value can not be supplied as the configuration value."));
+ }
+
+ private ConfigInfos invalidVersionedConnectorValidation(
+ Map connectorProps,
+ VersionedPluginLoadingException e,
+ Function reportStage
+ ) {
+ String connType = connectorProps.get(CONNECTOR_CLASS_CONFIG);
+ ConfigDef configDef = ConnectorConfig.enrichedConfigDef(worker.getPlugins(), connType);
+ Map validatedConfig;
+ try (TemporaryStage stage = reportStage.apply("validating connector configuration")) {
+ validatedConfig = configDef.validateAll(connectorProps);
+ }
+ validatedConfig.get(CONNECTOR_CLASS_CONFIG).addErrorMessage(e.getMessage());
+ validatedConfig.get(CONNECTOR_VERSION).addErrorMessage(e.getMessage());
+ validatedConfig.get(CONNECTOR_VERSION).recommendedValues(e.availableVersions().stream().map(v -> (Object) v).collect(Collectors.toList()));
+ addNullValuedErrors(connectorProps, validatedConfig);
+ return generateResult(connType, configDef.configKeys(), new ArrayList<>(validatedConfig.values()), new ArrayList<>(configDef.groups()));
+ }
+
ConfigInfos validateConnectorConfig(
Map connectorProps,
Function reportStage,
@@ -646,150 +815,58 @@ ConfigInfos validateConnectorConfig(
connectorProps = worker.configTransformer().transform(connectorProps);
}
}
- String connType = connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
- if (connType == null)
+ String connType = connectorProps.get(CONNECTOR_CLASS_CONFIG);
+ if (connType == null) {
throw new BadRequestException("Connector config " + connectorProps + " contains no connector type");
+ }
+
+ VersionRange connVersion;
+ Connector connector;
+ ClassLoader connectorLoader;
+ try {
+ connVersion = PluginUtils.connectorVersionRequirement(connectorProps.get(CONNECTOR_VERSION));
+ connector = cachedConnectors.getConnector(connType, connVersion);
+ connectorLoader = plugins().pluginLoader(connType, connVersion);
+ log.info("Validating connector {}, version {}", connType, connector.version());
+ } catch (VersionedPluginLoadingException e) {
+ log.warn("Failed to load connector {} with version {}, skipping additional validations (connector, converters, transformations, client overrides) ",
+ connType, connectorProps.get(CONNECTOR_VERSION), e);
+ return invalidVersionedConnectorValidation(connectorProps, e, reportStage);
+ } catch (Exception e) {
+ throw new BadRequestException(e.getMessage(), e);
+ }
- Connector connector = getConnector(connType);
- ClassLoader connectorLoader = plugins().connectorLoader(connType);
try (LoaderSwap loaderSwap = plugins().withClassLoader(connectorLoader)) {
- org.apache.kafka.connect.health.ConnectorType connectorType;
+
ConfigDef enrichedConfigDef;
Map validatedConnectorConfig;
+ org.apache.kafka.connect.health.ConnectorType connectorType;
if (connector instanceof SourceConnector) {
connectorType = org.apache.kafka.connect.health.ConnectorType.SOURCE;
- enrichedConfigDef = ConnectorConfig.enrich(plugins(), SourceConnectorConfig.configDef(), connectorProps, false);
+ enrichedConfigDef = ConnectorConfig.enrich(plugins(), SourceConnectorConfig.enrichedConfigDef(plugins(), connectorProps, worker.config()), connectorProps, false);
stageDescription = "validating source connector-specific properties for the connector";
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
validatedConnectorConfig = validateSourceConnectorConfig((SourceConnector) connector, enrichedConfigDef, connectorProps);
}
} else {
connectorType = org.apache.kafka.connect.health.ConnectorType.SINK;
- enrichedConfigDef = ConnectorConfig.enrich(plugins(), SinkConnectorConfig.configDef(), connectorProps, false);
+ enrichedConfigDef = ConnectorConfig.enrich(plugins(), SinkConnectorConfig.enrichedConfigDef(plugins(), connectorProps, worker.config()), connectorProps, false);
stageDescription = "validating sink connector-specific properties for the connector";
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
validatedConnectorConfig = validateSinkConnectorConfig((SinkConnector) connector, enrichedConfigDef, connectorProps);
}
}
- connectorProps.entrySet().stream()
- .filter(e -> e.getValue() == null)
- .map(Map.Entry::getKey)
- .forEach(prop ->
- validatedConnectorConfig.computeIfAbsent(prop, ConfigValue::new)
- .addErrorMessage("Null value can not be supplied as the configuration value.")
- );
-
- List configValues = new ArrayList<>(validatedConnectorConfig.values());
- Map configKeys = new LinkedHashMap<>(enrichedConfigDef.configKeys());
- Set allGroups = new LinkedHashSet<>(enrichedConfigDef.groups());
+ addNullValuedErrors(connectorProps, validatedConnectorConfig);
- // do custom connector-specific validation
- ConfigDef configDef;
- stageDescription = "retrieving the configuration definition from the connector";
- try (TemporaryStage stage = reportStage.apply(stageDescription)) {
- configDef = connector.config();
- }
- if (null == configDef) {
- throw new BadRequestException(
- String.format(
- "%s.config() must return a ConfigDef that is not null.",
- connector.getClass().getName()
- )
- );
- }
+ ConfigInfos connectorConfigInfo = validateConnectorPluginSpecifiedConfigs(connectorProps, validatedConnectorConfig, enrichedConfigDef, connector, reportStage);
+ ConfigInfos converterConfigInfo = validateAllConverterConfigs(connectorProps, validatedConnectorConfig, connectorLoader, reportStage);
+ ConfigInfos clientOverrideInfo = validateClientOverrides(connectorProps, connectorType, connector.getClass(), reportStage, doLog);
- Config config;
- stageDescription = "performing multi-property validation for the connector";
- try (TemporaryStage stage = reportStage.apply(stageDescription)) {
- config = connector.validate(connectorProps);
- }
- if (null == config) {
- throw new BadRequestException(
- String.format(
- "%s.validate() must return a Config that is not null.",
- connector.getClass().getName()
- )
- );
- }
- configKeys.putAll(configDef.configKeys());
- allGroups.addAll(configDef.groups());
- configValues.addAll(config.configValues());
-
- // do custom converter-specific validation
- ConfigInfos headerConverterConfigInfos = validateHeaderConverterConfig(
- connectorProps,
- validatedConnectorConfig.get(HEADER_CONVERTER_CLASS_CONFIG),
- reportStage
- );
- ConfigInfos keyConverterConfigInfos = validateKeyConverterConfig(
- connectorProps,
- validatedConnectorConfig.get(KEY_CONVERTER_CLASS_CONFIG),
- reportStage
- );
- ConfigInfos valueConverterConfigInfos = validateValueConverterConfig(
- connectorProps,
- validatedConnectorConfig.get(VALUE_CONVERTER_CLASS_CONFIG),
- reportStage
- );
-
- ConfigInfos configInfos = generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));
- AbstractConfig connectorConfig = new AbstractConfig(new ConfigDef(), connectorProps, doLog);
- String connName = connectorProps.get(ConnectorConfig.NAME_CONFIG);
- ConfigInfos producerConfigInfos = null;
- ConfigInfos consumerConfigInfos = null;
- ConfigInfos adminConfigInfos = null;
-
- if (connectorUsesProducer(connectorType, connectorProps)) {
- stageDescription = "validating producer config overrides for the connector";
- try (TemporaryStage stage = reportStage.apply(stageDescription)) {
- producerConfigInfos = validateClientOverrides(
- connName,
- ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX,
- connectorConfig,
- ProducerConfig.configDef(),
- connector.getClass(),
- connectorType,
- ConnectorClientConfigRequest.ClientType.PRODUCER,
- connectorClientConfigOverridePolicy);
- }
- }
- if (connectorUsesAdmin(connectorType, connectorProps)) {
- stageDescription = "validating admin config overrides for the connector";
- try (TemporaryStage stage = reportStage.apply(stageDescription)) {
- adminConfigInfos = validateClientOverrides(
- connName,
- ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX,
- connectorConfig,
- AdminClientConfig.configDef(),
- connector.getClass(),
- connectorType,
- ConnectorClientConfigRequest.ClientType.ADMIN,
- connectorClientConfigOverridePolicy);
- }
- }
- if (connectorUsesConsumer(connectorType, connectorProps)) {
- stageDescription = "validating consumer config overrides for the connector";
- try (TemporaryStage stage = reportStage.apply(stageDescription)) {
- consumerConfigInfos = validateClientOverrides(
- connName,
- ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
- connectorConfig,
- ConsumerConfig.configDef(),
- connector.getClass(),
- connectorType,
- ConnectorClientConfigRequest.ClientType.CONSUMER,
- connectorClientConfigOverridePolicy);
- }
- }
return mergeConfigInfos(connType,
- configInfos,
- producerConfigInfos,
- consumerConfigInfos,
- adminConfigInfos,
- headerConverterConfigInfos,
- keyConverterConfigInfos,
- valueConverterConfigInfos
+ connectorConfigInfo,
+ clientOverrideInfo,
+ converterConfigInfo
);
}
}
@@ -936,10 +1013,6 @@ private static ConfigValueInfo convertConfigValue(ConfigValue configValue, Type
return new ConfigValueInfo(configValue.name(), value, recommendedValues, configValue.errorMessages(), configValue.visible());
}
- protected Connector getConnector(String connType) {
- return tempConnectors.computeIfAbsent(connType, k -> plugins().newConnector(k));
- }
-
/**
* Retrieves ConnectorType for the class specified in the connector config
* @param connConfig the connector config, may be null
@@ -950,13 +1023,14 @@ public ConnectorType connectorType(Map connConfig) {
if (connConfig == null) {
return ConnectorType.UNKNOWN;
}
- String connClass = connConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ String connClass = connConfig.get(CONNECTOR_CLASS_CONFIG);
if (connClass == null) {
return ConnectorType.UNKNOWN;
}
try {
- return ConnectorType.from(getConnector(connClass).getClass());
- } catch (ConnectException e) {
+ VersionRange range = PluginUtils.connectorVersionRequirement(connConfig.get(CONNECTOR_VERSION));
+ return ConnectorType.from(cachedConnectors.getConnector(connClass, range).getClass());
+ } catch (Exception e) {
log.warn("Unable to retrieve connector type", e);
return ConnectorType.UNKNOWN;
}
@@ -1078,25 +1152,33 @@ static Set keysWithVariableValues(Map rawConfig, Pattern
@Override
public List connectorPluginConfig(String pluginName) {
+ return connectorPluginConfig(pluginName, null);
+ }
+
+ @Override
+ public List connectorPluginConfig(String pluginName, VersionRange range) {
+
Plugins p = plugins();
Class> pluginClass;
try {
- pluginClass = p.pluginClass(pluginName);
+ pluginClass = p.pluginClass(pluginName, range);
} catch (ClassNotFoundException cnfe) {
throw new NotFoundException("Unknown plugin " + pluginName + ".");
+ } catch (VersionedPluginLoadingException e) {
+ throw new BadRequestException(e.getMessage(), e);
}
try (LoaderSwap loaderSwap = p.withClassLoader(pluginClass.getClassLoader())) {
- Object plugin = p.newPlugin(pluginName);
+ Object plugin = p.newPlugin(pluginName, range);
// Contains definitions coming from Connect framework
ConfigDef baseConfigDefs = null;
// Contains definitions specifically declared on the plugin
ConfigDef pluginConfigDefs;
if (plugin instanceof SinkConnector) {
- baseConfigDefs = SinkConnectorConfig.configDef();
+ baseConfigDefs = SinkConnectorConfig.enrichedConfigDef(p, pluginName);
pluginConfigDefs = ((SinkConnector) plugin).config();
} else if (plugin instanceof SourceConnector) {
- baseConfigDefs = SourceConnectorConfig.configDef();
+ baseConfigDefs = SourceConnectorConfig.enrichedConfigDef(p, pluginName);
pluginConfigDefs = ((SourceConnector) plugin).config();
} else if (plugin instanceof Converter) {
pluginConfigDefs = ((Converter) plugin).config();
@@ -1114,8 +1196,9 @@ public List connectorPluginConfig(String pluginName) {
// give precedence to the one defined by the plugin class
// Preserve the ordering of properties as they're returned from each ConfigDef
Map configsMap = new LinkedHashMap<>(pluginConfigDefs.configKeys());
- if (baseConfigDefs != null)
+ if (baseConfigDefs != null) {
baseConfigDefs.configKeys().forEach(configsMap::putIfAbsent);
+ }
List results = new ArrayList<>();
for (ConfigKey configKey : configsMap.values()) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java
new file mode 100644
index 0000000000000..59c4281fff885
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException;
+
+import org.apache.maven.artifact.versioning.VersionRange;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CachedConnectors {
+
+ private static final String LATEST_VERSION = "latest";
+
+ private final Map> connectors;
+ private final Map invalidConnectors;
+ private final Map> invalidVersions;
+ private final Plugins plugins;
+
+ public CachedConnectors(Plugins plugins) {
+ this.plugins = plugins;
+ this.connectors = new ConcurrentHashMap<>();
+ this.invalidConnectors = new ConcurrentHashMap<>();
+ this.invalidVersions = new ConcurrentHashMap<>();
+ }
+
+ private void validate(String connectorName, VersionRange range) throws Exception {
+ if (invalidConnectors.containsKey(connectorName)) {
+ throw new Exception(invalidConnectors.get(connectorName));
+ }
+
+ String version = range == null ? LATEST_VERSION : range.toString();
+ if (invalidVersions.containsKey(connectorName) && invalidVersions.get(connectorName).containsKey(version)) {
+ throw new Exception(invalidVersions.get(connectorName).get(version));
+ }
+ }
+
+ private Connector lookup(String connectorName, VersionRange range) throws Exception {
+ String version = range == null ? LATEST_VERSION : range.toString();
+ if (connectors.containsKey(connectorName) && connectors.get(connectorName).containsKey(version)) {
+ return connectors.get(connectorName).get(version);
+ }
+
+ try {
+ Connector connector = plugins.newConnector(connectorName, range);
+ connectors.computeIfAbsent(connectorName, k -> new ConcurrentHashMap<>()).put(version, connector);
+ return connector;
+ } catch (VersionedPluginLoadingException e) {
+ invalidVersions.computeIfAbsent(connectorName, k -> new ConcurrentHashMap<>()).put(version, e);
+ throw e;
+ } catch (Exception e) {
+ invalidConnectors.put(connectorName, e);
+ throw e;
+ }
+ }
+
+ public Connector getConnector(String connectorName, VersionRange range) throws Exception {
+ validate(connectorName, range);
+ return lookup(connectorName, range);
+ }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index cb604ad73eef5..929dc57a37709 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -27,7 +27,10 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.PluginUtils;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.PluginsRecommenders;
+import org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
@@ -35,6 +38,8 @@
import org.apache.kafka.connect.util.ConcreteSubClassValidator;
import org.apache.kafka.connect.util.InstantiableClassValidator;
+import org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +87,11 @@ public class ConnectorConfig extends AbstractConfig {
" or use \"FileStreamSink\" or \"FileStreamSinkConnector\" to make the configuration a bit shorter";
private static final String CONNECTOR_CLASS_DISPLAY = "Connector class";
+ public static final String CONNECTOR_VERSION = "connector." + WorkerConfig.PLUGIN_VERSION_SUFFIX;
+ private static final String CONNECTOR_VERSION_DOC = "Version of the connector.";
+ private static final String CONNECTOR_VERSION_DISPLAY = "Connector version";
+ private static final ConfigDef.Validator CONNECTOR_VERSION_VALIDATOR = new PluginVersionValidator();
+
public static final String KEY_CONVERTER_CLASS_CONFIG = WorkerConfig.KEY_CONVERTER_CLASS_CONFIG;
public static final String KEY_CONVERTER_CLASS_DOC = WorkerConfig.KEY_CONVERTER_CLASS_DOC;
public static final String KEY_CONVERTER_CLASS_DISPLAY = "Key converter class";
@@ -90,6 +100,12 @@ public class ConnectorConfig extends AbstractConfig {
new InstantiableClassValidator()
);
+ public static final String KEY_CONVERTER_VERSION_CONFIG = WorkerConfig.KEY_CONVERTER_VERSION;
+ private static final String KEY_CONVERTER_VERSION_DOC = "Version of the key converter.";
+ private static final String KEY_CONVERTER_VERSION_DISPLAY = "Key converter version";
+ private static final ConfigDef.Validator KEY_CONVERTER_VERSION_VALIDATOR = new PluginVersionValidator();
+
+
public static final String VALUE_CONVERTER_CLASS_CONFIG = WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG;
public static final String VALUE_CONVERTER_CLASS_DOC = WorkerConfig.VALUE_CONVERTER_CLASS_DOC;
public static final String VALUE_CONVERTER_CLASS_DISPLAY = "Value converter class";
@@ -98,17 +114,24 @@ public class ConnectorConfig extends AbstractConfig {
new InstantiableClassValidator()
);
+ public static final String VALUE_CONVERTER_VERSION_CONFIG = WorkerConfig.VALUE_CONVERTER_VERSION;
+ private static final String VALUE_CONVERTER_VERSION_DOC = "Version of the value converter.";
+ private static final String VALUE_CONVERTER_VERSION_DISPLAY = "Value converter version";
+ private static final ConfigDef.Validator VALUE_CONVERTER_VERSION_VALIDATOR = new PluginVersionValidator();
+
public static final String HEADER_CONVERTER_CLASS_CONFIG = WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG;
public static final String HEADER_CONVERTER_CLASS_DOC = WorkerConfig.HEADER_CONVERTER_CLASS_DOC;
public static final String HEADER_CONVERTER_CLASS_DISPLAY = "Header converter class";
- // The Connector config should not have a default for the header converter, since the absence of a config property means that
- // the worker config settings should be used. Thus, we set the default to null here.
- public static final String HEADER_CONVERTER_CLASS_DEFAULT = null;
private static final ConfigDef.Validator HEADER_CONVERTER_CLASS_VALIDATOR = ConfigDef.CompositeValidator.of(
ConcreteSubClassValidator.forSuperClass(HeaderConverter.class),
new InstantiableClassValidator()
);
+ public static final String HEADER_CONVERTER_VERSION_CONFIG = WorkerConfig.HEADER_CONVERTER_VERSION;
+ private static final String HEADER_CONVERTER_VERSION_DOC = "Version of the header converter.";
+ private static final String HEADER_CONVERTER_VERSION_DISPLAY = "Header converter version";
+ private static final ConfigDef.Validator HEADER_CONVERTER_VERSION_VALIDATOR = new PluginVersionValidator();
+
public static final String TASKS_MAX_CONFIG = "tasks.max";
private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector.";
public static final int TASKS_MAX_DEFAULT = 1;
@@ -187,7 +210,11 @@ public class ConnectorConfig extends AbstractConfig {
public static final String CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX = "admin.override.";
public static final String PREDICATES_PREFIX = "predicates.";
- private final EnrichedConnectorConfig enrichedConfig;
+ private static final PluginsRecommenders EMPTY_RECOMMENDER = new PluginsRecommenders();
+ private static final ConverterDefaults CONVERTER_DEFAULTS = new ConverterDefaults(null, null);
+
+ private final ConnectorConfig.EnrichedConnectorConfig enrichedConfig;
+
private static class EnrichedConnectorConfig extends AbstractConfig {
EnrichedConnectorConfig(ConfigDef configDef, Map props) {
super(configDef, props);
@@ -199,17 +226,27 @@ public Object get(String key) {
}
}
- public static ConfigDef configDef() {
+ protected static ConfigDef configDef(
+ String defaultConnectorVersion,
+ ConverterDefaults keyConverterDefaults,
+ ConverterDefaults valueConverterDefaults,
+ ConverterDefaults headerConverterDefaults,
+ PluginsRecommenders recommender
+ ) {
int orderInGroup = 0;
int orderInErrorGroup = 0;
return new ConfigDef()
.define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, NAME_DISPLAY)
.define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.LONG, CONNECTOR_CLASS_DISPLAY)
+ .define(CONNECTOR_VERSION, Type.STRING, defaultConnectorVersion, CONNECTOR_VERSION_VALIDATOR, Importance.MEDIUM, CONNECTOR_VERSION_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, CONNECTOR_VERSION_DISPLAY, recommender.connectorPluginVersionRecommender())
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, TASK_MAX_DISPLAY)
.define(TASKS_MAX_ENFORCE_CONFIG, Type.BOOLEAN, TASKS_MAX_ENFORCE_DEFAULT, Importance.LOW, TASKS_MAX_ENFORCE_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, TASKS_MAX_ENFORCE_DISPLAY)
- .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, KEY_CONVERTER_CLASS_VALIDATOR, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
- .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, VALUE_CONVERTER_CLASS_VALIDATOR, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
- .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, HEADER_CONVERTER_CLASS_VALIDATOR, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY)
+ .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, keyConverterDefaults.type, KEY_CONVERTER_CLASS_VALIDATOR, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY, recommender.converterPluginRecommender())
+ .define(KEY_CONVERTER_VERSION_CONFIG, Type.STRING, keyConverterDefaults.version, KEY_CONVERTER_VERSION_VALIDATOR, Importance.LOW, KEY_CONVERTER_VERSION_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_VERSION_DISPLAY, recommender.keyConverterPluginVersionRecommender())
+ .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, valueConverterDefaults.type, VALUE_CONVERTER_CLASS_VALIDATOR, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY, recommender.converterPluginRecommender())
+ .define(VALUE_CONVERTER_VERSION_CONFIG, Type.STRING, valueConverterDefaults.version, VALUE_CONVERTER_VERSION_VALIDATOR, Importance.LOW, VALUE_CONVERTER_VERSION_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_VERSION_DISPLAY, recommender.valueConverterPluginVersionRecommender())
+ .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, headerConverterDefaults.type, HEADER_CONVERTER_CLASS_VALIDATOR, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY, recommender.headerConverterPluginRecommender())
+ .define(HEADER_CONVERTER_VERSION_CONFIG, Type.STRING, headerConverterDefaults.version, HEADER_CONVERTER_VERSION_VALIDATOR, Importance.LOW, HEADER_CONVERTER_VERSION_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_VERSION_DISPLAY, recommender.headerConverterPluginVersionRecommender())
.define(TRANSFORMS_CONFIG, Type.LIST, Collections.emptyList(), aliasValidator("transformation"), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY)
.define(PREDICATES_CONFIG, Type.LIST, Collections.emptyList(), aliasValidator("predicate"), Importance.LOW, PREDICATES_DOC, PREDICATES_GROUP, ++orderInGroup, Width.LONG, PREDICATES_DISPLAY)
.define(CONFIG_RELOAD_ACTION_CONFIG, Type.STRING, CONFIG_RELOAD_ACTION_RESTART,
@@ -226,6 +263,28 @@ public static ConfigDef configDef() {
ERRORS_LOG_ENABLE_DOC, ERROR_GROUP, ++orderInErrorGroup, Width.SHORT, ERRORS_LOG_ENABLE_DISPLAY)
.define(ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, Type.BOOLEAN, ERRORS_LOG_INCLUDE_MESSAGES_DEFAULT, Importance.MEDIUM,
ERRORS_LOG_INCLUDE_MESSAGES_DOC, ERROR_GROUP, ++orderInErrorGroup, Width.SHORT, ERRORS_LOG_INCLUDE_MESSAGES_DISPLAY);
+
+ }
+
+ public static ConfigDef configDef() {
+ return configDef(null, CONVERTER_DEFAULTS, CONVERTER_DEFAULTS, CONVERTER_DEFAULTS, EMPTY_RECOMMENDER);
+ }
+
+ // ConfigDef with additional defaults and recommenders
+ public static ConfigDef enrichedConfigDef(Plugins plugins, Map connProps, WorkerConfig workerConfig) {
+ PluginsRecommenders recommender = new PluginsRecommenders(plugins);
+ ConverterDefaults keyConverterDefaults = converterDefaults(plugins, KEY_CONVERTER_CLASS_CONFIG,
+ WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION, connProps, workerConfig, Converter.class);
+ ConverterDefaults valueConverterDefaults = converterDefaults(plugins, VALUE_CONVERTER_CLASS_CONFIG,
+ WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, WorkerConfig.VALUE_CONVERTER_VERSION, connProps, workerConfig, Converter.class);
+ ConverterDefaults headerConverterDefaults = converterDefaults(plugins, HEADER_CONVERTER_CLASS_CONFIG,
+ WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, WorkerConfig.HEADER_CONVERTER_VERSION, connProps, workerConfig, HeaderConverter.class);
+ return configDef(plugins.latestVersion(connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)),
+ keyConverterDefaults, valueConverterDefaults, headerConverterDefaults, recommender);
+ }
+
+ public static ConfigDef enrichedConfigDef(Plugins plugins, String connectorClass) {
+ return configDef(plugins.latestVersion(connectorClass), CONVERTER_DEFAULTS, CONVERTER_DEFAULTS, CONVERTER_DEFAULTS, EMPTY_RECOMMENDER);
}
private static ConfigDef.CompositeValidator aliasValidator(String kind) {
@@ -271,7 +330,7 @@ public long errorMaxDelayInMillis() {
public ToleranceType errorToleranceType() {
String tolerance = getString(ERRORS_TOLERANCE_CONFIG);
- for (ToleranceType type: ToleranceType.values()) {
+ for (ToleranceType type : ToleranceType.values()) {
if (type.name().equalsIgnoreCase(tolerance)) {
return type;
}
@@ -360,17 +419,17 @@ protected ConfigDef initialConfigDef() {
@Override
protected Stream> configDefsForClass(String typeConfig) {
return super.configDefsForClass(typeConfig)
- .filter(entry -> {
- // The implicit parameters mask any from the transformer with the same name
- if (TransformationStage.PREDICATE_CONFIG.equals(entry.getKey())
- || TransformationStage.NEGATE_CONFIG.equals(entry.getKey())) {
- log.warn("Transformer config {} is masked by implicit config of that name",
- entry.getKey());
- return false;
- } else {
- return true;
- }
- });
+ .filter(entry -> {
+ // The implicit parameters mask any from the transformer with the same name
+ if (TransformationStage.PREDICATE_CONFIG.equals(entry.getKey())
+ || TransformationStage.NEGATE_CONFIG.equals(entry.getKey())) {
+ log.warn("Transformer config {} is masked by implicit config of that name",
+ entry.getKey());
+ return false;
+ } else {
+ return true;
+ }
+ });
}
@Override
@@ -405,6 +464,87 @@ protected ConfigDef config(Predicate> predicate) {
return newDef;
}
+ private static ConverterDefaults converterDefaults(
+ Plugins plugins,
+ String connectorConverterConfig,
+ String workerConverterConfig,
+ String workerConverterVersionConfig,
+ Map connectorProps,
+ WorkerConfig workerConfig,
+ Class converterType
+ ) {
+ /*
+ if a converter is specified in the connector config it overrides the worker config for the corresponding converter
+ otherwise the worker config is used, hence if the converter is not provided in the connector config, the default
+ is the one provided in the worker config
+
+ for converters which version is used depends on a several factors with multi-versioning support
+ A. If the converter class is provided as part of the connector properties
+ 1. if the version is not provided,
+ - if the converter is packaged with the connector then, the packaged version is used
+ - if the converter is not packaged with the connector, the latest version is used
+ 2. if the version is provided, the provided version is used
+ B. If the converter class is not provided as part of the connector properties, but provided as part of the worker properties
+ 1. if the version is not provided, the latest version is used
+ 2. if the version is provided, the provided version is used
+ C. If the converter class is not provided as part of the connector properties and not provided as part of the worker properties,
+ the converter to use is unknown hence no default version can be determined (null)
+
+ Note: Connect when using service loading has an issue outlined in KAFKA-18119. The issue means that the above
+ logic does not hold currently for clusters using service loading when converters are defined in the connector.
+ However, the logic to determine the default should ideally follow the one outlined above, and the code here
+ should still show the correct default version regardless of the bug.
+ */
+ final String connectorConverter = connectorProps.get(connectorConverterConfig);
+ // since header converter defines a default in the worker config we need to handle it separately
+ final String workerConverter = workerConverterConfig.equals(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG) ?
+ workerConfig.getClass(workerConverterConfig).getName() : workerConfig.originalsStrings().get(workerConverterConfig);
+ final String connectorClass = connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ final String connectorVersion = connectorProps.get(ConnectorConfig.CONNECTOR_VERSION);
+ String type = null;
+ if (connectorClass == null || (connectorConverter == null && workerConverter == null)) {
+ return new ConverterDefaults(null, null);
+ }
+ // update the default of connector converter based on if the worker converter is provided
+ type = workerConverter;
+
+ String version = null;
+ if (connectorConverter != null) {
+ version = fetchPluginVersion(plugins, connectorConverter, connectorVersion, connectorConverter);
+ } else {
+ version = workerConfig.originalsStrings().get(workerConverterVersionConfig);
+ if (version == null) {
+ version = plugins.latestVersion(workerConverter);
+ }
+ }
+ return new ConverterDefaults(type, version);
+ }
+
+ private static void updateKeyDefault(ConfigDef configDef, String versionConfigKey, String versionDefault) {
+ ConfigDef.ConfigKey key = configDef.configKeys().get(versionConfigKey);
+ if (key == null) {
+ return;
+ }
+ configDef.configKeys().put(versionConfigKey, new ConfigDef.ConfigKey(
+ versionConfigKey, key.type, versionDefault, key.validator, key.importance, key.documentation, key.group, key.orderInGroup, key.width, key.displayName, key.dependents, key.recommender, false
+ ));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static String fetchPluginVersion(Plugins plugins, String connectorClass, String connectorVersion, String pluginName) {
+ if (pluginName == null) {
+ return null;
+ }
+ try {
+ VersionRange range = PluginUtils.connectorVersionRequirement(connectorVersion);
+ return plugins.pluginVersion(pluginName, plugins.pluginLoader(connectorClass, range));
+ } catch (InvalidVersionSpecificationException | VersionedPluginLoadingException e) {
+ // these errors should be captured in other places, so we can ignore them here
+ log.warn("Failed to determine default plugin version for {}", connectorClass, e);
+ }
+ return null;
+ }
+
/**
* An abstraction over "enrichable plugins" ({@link Transformation}s and {@link Predicate}s) used for computing the
* contribution to a Connectors ConfigDef.
@@ -455,14 +595,14 @@ void enrich(ConfigDef newDef) {
final String typeConfig = prefix + "type";
final ConfigDef.Validator typeValidator = ConfigDef.LambdaValidator.with(
- (String name, Object value) -> {
- validateProps(prefix);
- // The value will be null if the class couldn't be found; no point in performing follow-up validation
- if (value != null) {
- getConfigDefFromConfigProvidingClass(typeConfig, (Class>) value);
- }
- },
- () -> "valid configs for " + alias + " " + aliasKind.toLowerCase(Locale.ENGLISH));
+ (String name, Object value) -> {
+ validateProps(prefix);
+ // The value will be null if the class couldn't be found; no point in performing follow-up validation
+ if (value != null) {
+ getConfigDefFromConfigProvidingClass(typeConfig, (Class>) value);
+ }
+ },
+ () -> "valid configs for " + alias + " " + aliasKind.toLowerCase(Locale.ENGLISH));
newDef.define(typeConfig, Type.CLASS, ConfigDef.NO_DEFAULT_VALUE, typeValidator, Importance.HIGH,
"Class for the '" + alias + "' " + aliasKind.toLowerCase(Locale.ENGLISH) + ".", group, orderInGroup++, Width.LONG,
baseClass.getSimpleName() + " type for " + alias,
@@ -475,7 +615,8 @@ void enrich(ConfigDef newDef) {
}
/** Subclasses can add extra validation of the {@link #props}. */
- protected void validateProps(String prefix) { }
+ protected void validateProps(String prefix) {
+ }
/**
* Populates the ConfigDef according to the configs returned from {@code configs()} method of class
@@ -486,7 +627,6 @@ protected ConfigDef populateConfigDef(String typeConfig) {
try {
configDefsForClass(typeConfig)
.forEach(entry -> configDef.define(entry.getValue()));
-
} catch (ConfigException e) {
if (requireFullConfig) {
throw e;
@@ -533,10 +673,10 @@ ConfigDef getConfigDefFromConfigProvidingClass(String key, Class> cls) {
ConfigDef configDef = config(pluginInstance);
if (null == configDef) {
throw new ConnectException(
- String.format(
- "%s.config() must return a ConfigDef that is not null.",
- cls.getName()
- )
+ String.format(
+ "%s.config() must return a ConfigDef that is not null.",
+ cls.getName()
+ )
);
}
return configDef;
@@ -576,4 +716,34 @@ public boolean visible(String name, Map parsedConfig) {
}
}
+ private static class ConverterDefaults {
+ private final String type;
+ private final String version;
+
+ public ConverterDefaults(String type, String version) {
+ this.type = type;
+ this.version = version;
+ }
+
+ public String type() {
+ return type;
+ }
+
+ public String version() {
+ return version;
+ }
+ }
+
+ public static class PluginVersionValidator implements ConfigDef.Validator {
+
+ @Override
+ public void ensureValid(String name, Object value) {
+
+ try {
+ PluginUtils.connectorVersionRequirement((String) value);
+ } catch (InvalidVersionSpecificationException e) {
+ throw new ConfigException(name, value, e.getMessage());
+ }
+ }
+ }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 52be401bbfaba..859e3f2728e12 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -32,6 +32,8 @@
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.maven.artifact.versioning.VersionRange;
+
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -322,6 +324,8 @@ default void validateConnectorConfig(Map connectorConfig, Callba
*/
List connectorPluginConfig(String pluginName);
+ List connectorPluginConfig(String pluginName, VersionRange version);
+
/**
* Get the current offsets for a connector.
* @param connName the name of the connector whose offsets are to be retrieved
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index 2ab7dfa089763..4584255e23132 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -73,19 +73,29 @@ public class SinkConnectorConfig extends ConnectorConfig {
"keys, all error context header keys will start with __connect.errors.
";
private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable Error Context Headers";
- static final ConfigDef CONFIG = ConnectorConfig.configDef()
- .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY)
- .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
- .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY)
- .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY)
- .define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
+ private static ConfigDef configDef(ConfigDef baseConfigs) {
+ return baseConfigs
+ .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY)
+ .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
+ .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY)
+ .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY)
+ .define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
+ }
public static ConfigDef configDef() {
- return CONFIG;
+ return configDef(ConnectorConfig.configDef());
+ }
+
+ public static ConfigDef enrichedConfigDef(Plugins plugins, Map connProps, WorkerConfig workerConfig) {
+ return configDef(ConnectorConfig.enrichedConfigDef(plugins, connProps, workerConfig));
+ }
+
+ public static ConfigDef enrichedConfigDef(Plugins plugins, String connectorClass) {
+ return configDef(ConnectorConfig.enrichedConfigDef(plugins, connectorClass));
}
public SinkConnectorConfig(Plugins plugins, Map props) {
- super(plugins, CONFIG, props);
+ super(plugins, configDef(), props);
}
/**
@@ -206,6 +216,6 @@ public boolean enableErrantRecordReporter() {
}
public static void main(String[] args) {
- System.out.println(CONFIG.toHtml(4, config -> "sinkconnectorconfigs_" + config));
+ System.out.println(configDef().toHtml(4, config -> "sinkconnectorconfigs_" + config));
}
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
index bc797563b10dd..effa31353764c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
@@ -125,10 +125,10 @@ private static class EnrichedSourceConnectorConfig extends ConnectorConfig {
private final EnrichedSourceConnectorConfig enrichedSourceConfig;
private final String offsetsTopic;
- public static ConfigDef configDef() {
+ private static ConfigDef configDef(ConfigDef baseConfigDef) {
ConfigDef.Validator atLeastZero = ConfigDef.Range.atLeast(0);
int orderInGroup = 0;
- return new ConfigDef(ConnectorConfig.configDef())
+ return new ConfigDef(baseConfigDef)
.define(
TOPIC_CREATION_GROUPS_CONFIG,
ConfigDef.Type.LIST,
@@ -203,6 +203,18 @@ public static ConfigDef configDef() {
OFFSETS_TOPIC_DISPLAY);
}
+ public static ConfigDef configDef() {
+ return configDef(ConnectorConfig.configDef());
+ }
+
+ public static ConfigDef enrichedConfigDef(Plugins plugins, Map connProps, WorkerConfig workerConfig) {
+ return configDef(ConnectorConfig.enrichedConfigDef(plugins, connProps, workerConfig));
+ }
+
+ public static ConfigDef enrichedConfigDef(Plugins plugins, String connectorClass) {
+ return configDef(ConnectorConfig.enrichedConfigDef(plugins, connectorClass));
+ }
+
public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) {
String defaultGroup = "default";
ConfigDef newDefaultDef = new ConfigDef(baseConfigDef);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 591e9816a7a50..1f97a907b642e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -278,6 +278,10 @@ public void stop() {
ThreadUtils.shutdownExecutorServiceQuietly(executor, EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
+ public WorkerConfig config() {
+ return config;
+ }
+
/**
* Start a connector managed by this worker.
*
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index ca188ffd97af7..a68cdb4ea03d0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -70,6 +70,8 @@ public class WorkerConfig extends AbstractConfig {
public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
public static final String CLIENT_DNS_LOOKUP_DOC = CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC;
+ public static final String PLUGIN_VERSION_SUFFIX = "plugin.version";
+
public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter";
public static final String KEY_CONVERTER_CLASS_DOC =
"Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
@@ -77,6 +79,10 @@ public class WorkerConfig extends AbstractConfig {
" independent of connectors it allows any connector to work with any serialization format." +
" Examples of common formats include JSON and Avro.";
+ public static final String KEY_CONVERTER_VERSION = "key.converter." + PLUGIN_VERSION_SUFFIX;
+ public static final String KEY_CONVERTER_VERSION_DEFAULT = null;
+ public static final String KEY_CONVERTER_VERSION_DOC = "Version of the key converter.";
+
public static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter";
public static final String VALUE_CONVERTER_CLASS_DOC =
"Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
@@ -84,6 +90,10 @@ public class WorkerConfig extends AbstractConfig {
" independent of connectors it allows any connector to work with any serialization format." +
" Examples of common formats include JSON and Avro.";
+ public static final String VALUE_CONVERTER_VERSION = "value.converter." + PLUGIN_VERSION_SUFFIX;
+ public static final String VALUE_CONVERTER_VERSION_DEFAULT = null;
+ public static final String VALUE_CONVERTER_VERSION_DOC = "Version of the value converter.";
+
public static final String HEADER_CONVERTER_CLASS_CONFIG = "header.converter";
public static final String HEADER_CONVERTER_CLASS_DOC =
"HeaderConverter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
@@ -93,6 +103,10 @@ public class WorkerConfig extends AbstractConfig {
" header values to strings and deserialize them by inferring the schemas.";
public static final String HEADER_CONVERTER_CLASS_DEFAULT = SimpleHeaderConverter.class.getName();
+ public static final String HEADER_CONVERTER_VERSION = "header.converter." + PLUGIN_VERSION_SUFFIX;
+ public static final String HEADER_CONVERTER_VERSION_DEFAULT = null;
+ public static final String HEADER_CONVERTER_VERSION_DOC = "Version of the header converter.";
+
public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
= "task.shutdown.graceful.timeout.ms";
private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC =
@@ -200,8 +214,12 @@ protected static ConfigDef baseConfigDef() {
CLIENT_DNS_LOOKUP_DOC)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
+ .define(KEY_CONVERTER_VERSION, Type.STRING,
+ KEY_CONVERTER_VERSION_DEFAULT, Importance.LOW, KEY_CONVERTER_VERSION_DOC)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
+ .define(VALUE_CONVERTER_VERSION, Type.STRING,
+ VALUE_CONVERTER_VERSION_DEFAULT, Importance.LOW, VALUE_CONVERTER_VERSION_DOC)
.define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)
@@ -237,6 +255,8 @@ protected static ConfigDef baseConfigDef() {
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
HEADER_CONVERTER_CLASS_DEFAULT,
Importance.LOW, HEADER_CONVERTER_CLASS_DOC)
+ .define(HEADER_CONVERTER_VERSION, Type.STRING,
+ HEADER_CONVERTER_VERSION_DEFAULT, Importance.LOW, HEADER_CONVERTER_VERSION_DOC)
.define(CONFIG_PROVIDERS_CONFIG, Type.LIST,
Collections.emptyList(),
Importance.LOW, CONFIG_PROVIDERS_DOC)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 2ca7979a52636..6a85043928751 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -139,6 +139,23 @@ String latestVersion(String classOrAlias) {
return inner.lastKey().version();
}
+ String versionInLocation(String classOrAlias, String location) {
+ if (classOrAlias == null) {
+ return null;
+ }
+ String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
+ SortedMap, ClassLoader> inner = pluginLoaders.get(fullName);
+ if (inner == null) {
+ return null;
+ }
+ for (Map.Entry, ClassLoader> entry : inner.entrySet()) {
+ if (entry.getKey().location().equals(location)) {
+ return entry.getKey().version();
+ }
+ }
+ return null;
+ }
+
private ClassLoader findPluginLoader(
SortedMap, ClassLoader> loaders,
String pluginName,
@@ -226,8 +243,12 @@ private void verifyClasspathVersionedPlugin(String name, Class> plugin, Versio
));
}
- List> classpathPlugins = scannedPlugin.keySet().stream()
+ // if a plugin implements two interfaces (like JsonConverter implements both converter and header converter)
+ // it will have two entries under classpath, one for each scan. Hence, we count distinct by version.
+ List classpathPlugins = scannedPlugin.keySet().stream()
.filter(pluginDesc -> pluginDesc.location().equals("classpath"))
+ .map(PluginDesc::version)
+ .distinct()
.collect(Collectors.toList());
if (classpathPlugins.size() > 1) {
@@ -239,7 +260,7 @@ private void verifyClasspathVersionedPlugin(String name, Class> plugin, Versio
} else if (classpathPlugins.isEmpty()) {
throw new VersionedPluginLoadingException("Invalid plugin found in classpath");
} else {
- pluginVersion = classpathPlugins.get(0).version();
+ pluginVersion = classpathPlugins.get(0);
if (!range.containsVersion(new DefaultArtifactVersion(pluginVersion))) {
throw new VersionedPluginLoadingException(String.format(
"Plugin %s has version %s which does not match the required version range %s",
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
index ff575e8edf890..1fecb59ddb579 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
@@ -60,8 +60,9 @@ public String toString() {
", location='" + location + '\'' +
'}';
}
+
@JsonIgnore
- DefaultArtifactVersion encodedVersion() {
+ public DefaultArtifactVersion encodedVersion() {
return encodedVersion;
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index 56567b3bee709..ae6d3ba3a1cd4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -493,6 +493,7 @@ private static Collection distinctUrls(Collection urls) {
}
return distinct.values();
}
+
public static VersionRange connectorVersionRequirement(String version) throws InvalidVersionSpecificationException {
if (version == null || version.equals("latest")) {
return null;
@@ -500,13 +501,13 @@ public static VersionRange connectorVersionRequirement(String version) throws In
version = version.trim();
// check first if the given version is valid
- VersionRange.createFromVersionSpec(version);
+ VersionRange range = VersionRange.createFromVersionSpec(version);
- // now if the version is not enclosed we consider it as a hard requirement and enclose it in []
- if (!version.startsWith("[") && !version.startsWith("(")) {
- version = "[" + version + "]";
+ if (range.hasRestrictions()) {
+ return range;
}
+ // now if the version is not enclosed we consider it as a hard requirement and enclose it in []
+ version = "[" + version + "]";
return VersionRange.createFromVersionSpec(version);
}
-
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 28c1f80c61815..b8027d71d5675 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -269,6 +269,13 @@ public String latestVersion(String classOrAlias) {
return delegatingLoader.latestVersion(classOrAlias);
}
+ public String pluginVersion(String classOrAlias, ClassLoader sourceLoader) {
+ if (!(sourceLoader instanceof PluginClassLoader)) {
+ return latestVersion(classOrAlias);
+ }
+ return delegatingLoader.versionInLocation(classOrAlias, ((PluginClassLoader) sourceLoader).location());
+ }
+
public DelegatingClassLoader delegatingLoader() {
return delegatingLoader;
}
@@ -278,7 +285,7 @@ public ClassLoader connectorLoader(String connectorClassOrAlias) {
return delegatingLoader.loader(connectorClassOrAlias);
}
- public ClassLoader pluginLoader(String classOrAlias, VersionRange range) throws ClassNotFoundException, VersionedPluginLoadingException {
+ public ClassLoader pluginLoader(String classOrAlias, VersionRange range) {
return delegatingLoader.loader(classOrAlias, range);
}
@@ -298,7 +305,7 @@ public Set> sinkConnectors() {
return scanResult.sinkConnectors();
}
- public Set> sinkConnectors(String connectorClassOrAlias) {
+ Set> sinkConnectors(String connectorClassOrAlias) {
return pluginsOfClass(connectorClassOrAlias, scanResult.sinkConnectors());
}
@@ -306,7 +313,7 @@ public Set> sourceConnectors() {
return scanResult.sourceConnectors();
}
- public Set> sourceConnectors(String connectorClassOrAlias) {
+ Set> sourceConnectors(String connectorClassOrAlias) {
return pluginsOfClass(connectorClassOrAlias, scanResult.sourceConnectors());
}
@@ -367,6 +374,13 @@ public Object newPlugin(String classOrAlias, VersionRange range) throws Versione
return newPlugin(klass);
}
+ public Object newPlugin(String classOrAlias, VersionRange range, ClassLoader sourceLoader) throws ClassNotFoundException {
+ if (range == null && sourceLoader instanceof PluginClassLoader) {
+ sourceLoader.loadClass(classOrAlias);
+ }
+ return newPlugin(classOrAlias, range);
+ }
+
public Connector newConnector(String connectorClassOrAlias) {
Class extends Connector> klass = connectorClass(connectorClassOrAlias);
return newPlugin(klass);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
new file mode 100644
index 0000000000000..8cf209ac08c91
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class PluginsRecommenders {
+
+ private final Plugins plugins;
+ private final ConverterPluginRecommender converterPluginRecommender;
+ private final ConnectorPluginVersionRecommender connectorPluginVersionRecommender;
+ private final HeaderConverterPluginRecommender headerConverterPluginRecommender;
+ private final KeyConverterPluginVersionRecommender keyConverterPluginVersionRecommender;
+ private final ValueConverterPluginVersionRecommender valueConverterPluginVersionRecommender;
+ private final HeaderConverterPluginVersionRecommender headerConverterPluginVersionRecommender;
+
+ public PluginsRecommenders() {
+ this(null);
+ }
+
+ public PluginsRecommenders(Plugins plugins) {
+ this.plugins = plugins;
+ this.converterPluginRecommender = new ConverterPluginRecommender();
+ this.connectorPluginVersionRecommender = new ConnectorPluginVersionRecommender();
+ this.headerConverterPluginRecommender = new HeaderConverterPluginRecommender();
+ this.keyConverterPluginVersionRecommender = new KeyConverterPluginVersionRecommender();
+ this.valueConverterPluginVersionRecommender = new ValueConverterPluginVersionRecommender();
+ this.headerConverterPluginVersionRecommender = new HeaderConverterPluginVersionRecommender();
+ }
+
+ public ConverterPluginRecommender converterPluginRecommender() {
+ return converterPluginRecommender;
+ }
+
+ public ConnectorPluginVersionRecommender connectorPluginVersionRecommender() {
+ return connectorPluginVersionRecommender;
+ }
+
+ public HeaderConverterPluginRecommender headerConverterPluginRecommender() {
+ return headerConverterPluginRecommender;
+ }
+
+ public KeyConverterPluginVersionRecommender keyConverterPluginVersionRecommender() {
+ return keyConverterPluginVersionRecommender;
+ }
+
+ public ValueConverterPluginVersionRecommender valueConverterPluginVersionRecommender() {
+ return valueConverterPluginVersionRecommender;
+ }
+
+ public HeaderConverterPluginVersionRecommender headerConverterPluginVersionRecommender() {
+ return headerConverterPluginVersionRecommender;
+ }
+
+ public class ConnectorPluginVersionRecommender implements ConfigDef.Recommender {
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Override
+ public List