diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 43772802070af..2fb62b45edead 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -859,9 +859,11 @@ ConfigInfos validateConnectorConfig( addNullValuedErrors(connectorProps, validatedConnectorConfig); - ConfigInfos connectorConfigInfo = validateConnectorPluginSpecifiedConfigs(connectorProps, validatedConnectorConfig, enrichedConfigDef, connector, reportStage); + // the order of operations here is important, converter validations can add error messages to the connector config + // which are collected and converted to ConfigInfos in validateConnectorPluginSpecifiedConfigs ConfigInfos converterConfigInfo = validateAllConverterConfigs(connectorProps, validatedConnectorConfig, connectorLoader, reportStage); ConfigInfos clientOverrideInfo = validateClientOverrides(connectorProps, connectorType, connector.getClass(), reportStage, doLog); + ConfigInfos connectorConfigInfo = validateConnectorPluginSpecifiedConfigs(connectorProps, validatedConnectorConfig, enrichedConfigDef, connector, reportStage); return mergeConfigInfos(connType, connectorConfigInfo, diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java index 59c4281fff885..ebfa3522f90d2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java @@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException; @@ -31,8 +32,8 @@ public class CachedConnectors { private static final String LATEST_VERSION = "latest"; private final Map> connectors; - private final Map invalidConnectors; - private final Map> invalidVersions; + private final Map invalidConnectors; + private final Map> invalidVersions; private final Plugins plugins; public CachedConnectors(Plugins plugins) { @@ -42,14 +43,14 @@ public CachedConnectors(Plugins plugins) { this.invalidVersions = new ConcurrentHashMap<>(); } - private void validate(String connectorName, VersionRange range) throws Exception { + private void validate(String connectorName, VersionRange range) throws ConnectException, VersionedPluginLoadingException { if (invalidConnectors.containsKey(connectorName)) { - throw new Exception(invalidConnectors.get(connectorName)); + throw new ConnectException(invalidConnectors.get(connectorName)); } String version = range == null ? LATEST_VERSION : range.toString(); if (invalidVersions.containsKey(connectorName) && invalidVersions.get(connectorName).containsKey(version)) { - throw new Exception(invalidVersions.get(connectorName).get(version)); + throw new VersionedPluginLoadingException(invalidVersions.get(connectorName).get(version).getMessage()); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 929dc57a37709..81d8f6d03bfa2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -27,6 +27,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.errors.ToleranceType; import org.apache.kafka.connect.runtime.isolation.PluginDesc; +import org.apache.kafka.connect.runtime.isolation.PluginType; import org.apache.kafka.connect.runtime.isolation.PluginUtils; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.isolation.PluginsRecommenders; @@ -274,17 +275,17 @@ public static ConfigDef configDef() { public static ConfigDef enrichedConfigDef(Plugins plugins, Map connProps, WorkerConfig workerConfig) { PluginsRecommenders recommender = new PluginsRecommenders(plugins); ConverterDefaults keyConverterDefaults = converterDefaults(plugins, KEY_CONVERTER_CLASS_CONFIG, - WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION, connProps, workerConfig, Converter.class); + WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION, connProps, workerConfig, PluginType.CONVERTER); ConverterDefaults valueConverterDefaults = converterDefaults(plugins, VALUE_CONVERTER_CLASS_CONFIG, - WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, WorkerConfig.VALUE_CONVERTER_VERSION, connProps, workerConfig, Converter.class); + WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, WorkerConfig.VALUE_CONVERTER_VERSION, connProps, workerConfig, PluginType.CONVERTER); ConverterDefaults headerConverterDefaults = converterDefaults(plugins, HEADER_CONVERTER_CLASS_CONFIG, - WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, WorkerConfig.HEADER_CONVERTER_VERSION, connProps, workerConfig, HeaderConverter.class); - return configDef(plugins.latestVersion(connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)), + WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, WorkerConfig.HEADER_CONVERTER_VERSION, connProps, workerConfig, PluginType.HEADER_CONVERTER); + return configDef(plugins.latestVersion(connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG), PluginType.SINK, PluginType.SOURCE), keyConverterDefaults, valueConverterDefaults, headerConverterDefaults, recommender); } public static ConfigDef enrichedConfigDef(Plugins plugins, String connectorClass) { - return configDef(plugins.latestVersion(connectorClass), CONVERTER_DEFAULTS, CONVERTER_DEFAULTS, CONVERTER_DEFAULTS, EMPTY_RECOMMENDER); + return configDef(plugins.latestVersion(connectorClass, PluginType.SINK, PluginType.SOURCE), CONVERTER_DEFAULTS, CONVERTER_DEFAULTS, CONVERTER_DEFAULTS, EMPTY_RECOMMENDER); } private static ConfigDef.CompositeValidator aliasValidator(String kind) { @@ -395,10 +396,9 @@ public > List> transformationS *

* {@code requireFullConfig} specifies whether required config values that are missing should cause an exception to be thrown. */ - @SuppressWarnings({"rawtypes", "unchecked"}) public static ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef, Map props, boolean requireFullConfig) { ConfigDef newDef = new ConfigDef(baseConfigDef); - new EnrichablePlugin>("Transformation", TRANSFORMS_CONFIG, TRANSFORMS_GROUP, (Class) Transformation.class, + new EnrichablePlugin>("Transformation", TRANSFORMS_CONFIG, TRANSFORMS_GROUP, PluginType.TRANSFORMATION, props, requireFullConfig) { @Override @@ -417,8 +417,8 @@ protected ConfigDef initialConfigDef() { } @Override - protected Stream> configDefsForClass(String typeConfig) { - return super.configDefsForClass(typeConfig) + protected Stream> configDefsForClass(String typeConfig, String versionConfig, Plugins plugins) { + return super.configDefsForClass(typeConfig, versionConfig, plugins) .filter(entry -> { // The implicit parameters mask any from the transformer with the same name if (TransformationStage.PREDICATE_CONFIG.equals(entry.getKey()) @@ -447,10 +447,16 @@ protected void validateProps(String prefix) { "but there is no config '" + prefixedPredicate + "' defining a predicate to be negated."); } } - }.enrich(newDef); + + @Override + protected ConfigDef.Recommender versionRecommender(String typeConfig) { + return new PluginsRecommenders(plugins).transformationPluginRecommender(typeConfig); + } + + }.enrich(newDef, plugins); new EnrichablePlugin>("Predicate", PREDICATES_CONFIG, PREDICATES_GROUP, - (Class) Predicate.class, props, requireFullConfig) { + PluginType.PREDICATE, props, requireFullConfig) { @Override protected Set>> plugins() { return plugins.predicates(); @@ -460,7 +466,14 @@ protected Set>> plugins() { protected ConfigDef config(Predicate predicate) { return predicate.config(); } - }.enrich(newDef); + + @Override + protected ConfigDef.Recommender versionRecommender(String typeConfig) { + return new PluginsRecommenders(plugins).predicatePluginRecommender(typeConfig); + } + + }.enrich(newDef, plugins); + return newDef; } @@ -471,7 +484,7 @@ private static ConverterDefaults converterDefaults( String workerConverterVersionConfig, Map connectorProps, WorkerConfig workerConfig, - Class converterType + PluginType converterType ) { /* if a converter is specified in the connector config it overrides the worker config for the corresponding converter @@ -510,34 +523,23 @@ the converter to use is unknown hence no default version can be determined (null String version = null; if (connectorConverter != null) { - version = fetchPluginVersion(plugins, connectorConverter, connectorVersion, connectorConverter); + version = fetchPluginVersion(plugins, connectorClass, connectorVersion, connectorConverter, converterType); } else { version = workerConfig.originalsStrings().get(workerConverterVersionConfig); if (version == null) { - version = plugins.latestVersion(workerConverter); + version = plugins.latestVersion(workerConverter, converterType); } } return new ConverterDefaults(type, version); } - private static void updateKeyDefault(ConfigDef configDef, String versionConfigKey, String versionDefault) { - ConfigDef.ConfigKey key = configDef.configKeys().get(versionConfigKey); - if (key == null) { - return; - } - configDef.configKeys().put(versionConfigKey, new ConfigDef.ConfigKey( - versionConfigKey, key.type, versionDefault, key.validator, key.importance, key.documentation, key.group, key.orderInGroup, key.width, key.displayName, key.dependents, key.recommender, false - )); - } - - @SuppressWarnings("unchecked") - private static String fetchPluginVersion(Plugins plugins, String connectorClass, String connectorVersion, String pluginName) { - if (pluginName == null) { + private static String fetchPluginVersion(Plugins plugins, String connectorClass, String connectorVersion, String pluginName, PluginType pluginType) { + if (pluginName == null || connectorClass == null) { return null; } try { VersionRange range = PluginUtils.connectorVersionRequirement(connectorVersion); - return plugins.pluginVersion(pluginName, plugins.pluginLoader(connectorClass, range)); + return plugins.pluginVersion(pluginName, plugins.pluginLoader(connectorClass, range), pluginType); } catch (InvalidVersionSpecificationException | VersionedPluginLoadingException e) { // these errors should be captured in other places, so we can ignore them here log.warn("Failed to determine default plugin version for {}", connectorClass, e); @@ -559,24 +561,27 @@ abstract static class EnrichablePlugin { private final String aliasKind; private final String aliasConfig; private final String aliasGroup; + private final PluginType pluginType; private final Class baseClass; private final Map props; private final boolean requireFullConfig; + @SuppressWarnings("unchecked") public EnrichablePlugin( String aliasKind, - String aliasConfig, String aliasGroup, Class baseClass, + String aliasConfig, String aliasGroup, PluginType pluginType, Map props, boolean requireFullConfig) { this.aliasKind = aliasKind; this.aliasConfig = aliasConfig; this.aliasGroup = aliasGroup; - this.baseClass = baseClass; + this.pluginType = pluginType; + this.baseClass = (Class) pluginType.superClass(); this.props = props; this.requireFullConfig = requireFullConfig; } /** Add the configs for this alias to the given {@code ConfigDef}. */ - void enrich(ConfigDef newDef) { + void enrich(ConfigDef newDef, Plugins plugins) { Object aliases = ConfigDef.parseType(aliasConfig, props.get(aliasConfig), Type.LIST); if (!(aliases instanceof List)) { return; @@ -594,12 +599,17 @@ void enrich(ConfigDef newDef) { int orderInGroup = 0; final String typeConfig = prefix + "type"; + final String versionConfig = prefix + WorkerConfig.PLUGIN_VERSION_SUFFIX; + final String defaultVersion = fetchPluginVersion(plugins, props.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG), + props.get(ConnectorConfig.CONNECTOR_VERSION), props.get(typeConfig), pluginType); + + // Add the class configuration final ConfigDef.Validator typeValidator = ConfigDef.LambdaValidator.with( (String name, Object value) -> { validateProps(prefix); // The value will be null if the class couldn't be found; no point in performing follow-up validation if (value != null) { - getConfigDefFromConfigProvidingClass(typeConfig, (Class) value); + getConfigDefFromPlugin(typeConfig, ((Class) value).getName(), props.getOrDefault(versionConfig, defaultVersion), plugins); } }, () -> "valid configs for " + alias + " " + aliasKind.toLowerCase(Locale.ENGLISH)); @@ -608,7 +618,25 @@ void enrich(ConfigDef newDef) { baseClass.getSimpleName() + " type for " + alias, Collections.emptyList(), new ClassRecommender()); - final ConfigDef configDef = populateConfigDef(typeConfig); + // Add the version configuration + final ConfigDef.Validator versionValidator = (name, value) -> { + if (value != null) { + try { + getConfigDefFromPlugin(typeConfig, props.get(typeConfig), (String) value, plugins); + } catch (VersionedPluginLoadingException e) { + throw e; + } catch (Exception e) { + // ignore any other exception here as they are not related to version validation and + // will be captured in the validation of the class configuration + } + } + }; + newDef.define(versionConfig, Type.STRING, defaultVersion, versionValidator, Importance.HIGH, + "Version of the '" + alias + "' " + aliasKind.toLowerCase(Locale.ENGLISH) + ".", group, orderInGroup++, Width.LONG, + baseClass.getSimpleName() + " version for " + alias, + Collections.emptyList(), versionRecommender(typeConfig)); + + final ConfigDef configDef = populateConfigDef(typeConfig, versionConfig, plugins); if (configDef == null) continue; newDef.embed(prefix, group, orderInGroup, configDef); } @@ -622,10 +650,10 @@ protected void validateProps(String prefix) { * Populates the ConfigDef according to the configs returned from {@code configs()} method of class * named in the {@code ...type} parameter of the {@code props}. */ - protected ConfigDef populateConfigDef(String typeConfig) { + protected ConfigDef populateConfigDef(String typeConfig, String versionConfig, Plugins plugins) { final ConfigDef configDef = initialConfigDef(); try { - configDefsForClass(typeConfig) + configDefsForClass(typeConfig, versionConfig, plugins) .forEach(entry -> configDef.define(entry.getValue())); } catch (ConfigException e) { if (requireFullConfig) { @@ -641,9 +669,11 @@ protected ConfigDef populateConfigDef(String typeConfig) { * Return a stream of configs provided by the {@code configs()} method of class * named in the {@code ...type} parameter of the {@code props}. */ - protected Stream> configDefsForClass(String typeConfig) { - final Class cls = (Class) ConfigDef.parseType(typeConfig, props.get(typeConfig), Type.CLASS); - return getConfigDefFromConfigProvidingClass(typeConfig, cls) + protected Stream> configDefsForClass(String typeConfig, String versionConfig, Plugins plugins) { + if (props.get(typeConfig) == null) { + throw new ConfigException(typeConfig, null, "Not a " + baseClass.getSimpleName()); + } + return getConfigDefFromPlugin(typeConfig, props.get(typeConfig), props.get(versionConfig), plugins) .configKeys().entrySet().stream(); } @@ -652,30 +682,46 @@ protected ConfigDef initialConfigDef() { return new ConfigDef(); } - /** - * Return {@link ConfigDef} from {@code cls}, which is expected to be a non-null {@code Class}, - * by instantiating it and invoking {@link #config(T)}. - * @param key - * @param cls The subclass of the baseclass. - */ - ConfigDef getConfigDefFromConfigProvidingClass(String key, Class cls) { - if (cls == null) { - throw new ConfigException(key, null, "Not a " + baseClass.getSimpleName()); + @SuppressWarnings("unchecked") + ConfigDef getConfigDefFromPlugin(String key, String pluginClass, String version, Plugins plugins) { + String connectorClass = props.get(CONNECTOR_CLASS_CONFIG); + if (pluginClass == null || connectorClass == null) { + // if transformation class is null or connector class is null, we return empty as these validations are done in respective validators + return new ConfigDef(); + } + VersionRange connectorVersionRange; + try { + connectorVersionRange = PluginUtils.connectorVersionRequirement(props.get(CONNECTOR_VERSION)); + } catch (InvalidVersionSpecificationException e) { + // this should be caught in connector version validation + return new ConfigDef(); } + + VersionRange pluginVersion; + try { + pluginVersion = PluginUtils.connectorVersionRequirement(version); + } catch (InvalidVersionSpecificationException e) { + throw new VersionedPluginLoadingException(e.getMessage()); + } + + // validate that the plugin class is a subclass of the base class + final Class cls = (Class) ConfigDef.parseType(key, props.get(key), Type.CLASS); Utils.ensureConcreteSubclass(baseClass, cls); - T pluginInstance; + T plugin; try { - pluginInstance = Utils.newInstance(cls, baseClass); + plugin = (T) plugins.newPlugin(pluginClass, pluginVersion, plugins.pluginLoader(connectorClass, connectorVersionRange)); + } catch (VersionedPluginLoadingException e) { + throw e; } catch (Exception e) { - throw new ConfigException(key, String.valueOf(cls), "Error getting config definition from " + baseClass.getSimpleName() + ": " + e.getMessage()); + throw new ConfigException(key, pluginClass, "Error getting config definition from " + baseClass.getSimpleName() + ": " + e.getMessage()); } - ConfigDef configDef = config(pluginInstance); + ConfigDef configDef = config(plugin); if (null == configDef) { throw new ConnectException( String.format( "%s.config() must return a ConfigDef that is not null.", - cls.getName() + plugin.getClass().getName() ) ); } @@ -695,6 +741,8 @@ ConfigDef getConfigDefFromConfigProvidingClass(String key, Class cls) { */ protected abstract Set> plugins(); + protected abstract ConfigDef.Recommender versionRecommender(String typeConfig); + /** * Recommend bundled transformations or predicates. */ @@ -742,7 +790,7 @@ public void ensureValid(String name, Object value) { try { PluginUtils.connectorVersionRequirement((String) value); } catch (InvalidVersionSpecificationException e) { - throw new ConfigException(name, value, e.getMessage()); + throw new VersionedPluginLoadingException(e.getMessage()); } } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java index 6a85043928751..bdeb224cfde60 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.connect.runtime.isolation; -import org.apache.maven.artifact.versioning.ArtifactVersion; import org.apache.maven.artifact.versioning.DefaultArtifactVersion; import org.apache.maven.artifact.versioning.VersionRange; import org.slf4j.Logger; @@ -28,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -127,19 +127,7 @@ String resolveFullClassName(String classOrAlias) { return aliases.getOrDefault(classOrAlias, classOrAlias); } - String latestVersion(String classOrAlias) { - if (classOrAlias == null) { - return null; - } - String fullName = aliases.getOrDefault(classOrAlias, classOrAlias); - SortedMap, ClassLoader> inner = pluginLoaders.get(fullName); - if (inner == null) { - return null; - } - return inner.lastKey().version(); - } - - String versionInLocation(String classOrAlias, String location) { + PluginDesc pluginDesc(String classOrAlias, String preferredLocation, Set allowedTypes) { if (classOrAlias == null) { return null; } @@ -148,12 +136,17 @@ String versionInLocation(String classOrAlias, String location) { if (inner == null) { return null; } + PluginDesc result = null; for (Map.Entry, ClassLoader> entry : inner.entrySet()) { - if (entry.getKey().location().equals(location)) { - return entry.getKey().version(); + if (!allowedTypes.contains(entry.getKey().type())) { + continue; + } + result = entry.getKey(); + if (result.location().equals(preferredLocation)) { + return result; } } - return null; + return result; } private ClassLoader findPluginLoader( @@ -170,7 +163,6 @@ private ClassLoader findPluginLoader( + "Provided soft version: %s ", range)); } - ArtifactVersion version = null; ClassLoader loader = null; for (Map.Entry, ClassLoader> entry : loaders.entrySet()) { // the entries should be in sorted order of versions so this should end up picking the latest version which matches the range @@ -227,19 +219,19 @@ protected Class loadVersionedPluginClass( if (range == null) { return plugin; } - verifyClasspathVersionedPlugin(name, plugin, range); + verifyClasspathVersionedPlugin(fullName, plugin, range); } return plugin; } - private void verifyClasspathVersionedPlugin(String name, Class plugin, VersionRange range) throws VersionedPluginLoadingException { + private void verifyClasspathVersionedPlugin(String fullName, Class plugin, VersionRange range) throws VersionedPluginLoadingException { String pluginVersion; - SortedMap, ClassLoader> scannedPlugin = pluginLoaders.get(name); + SortedMap, ClassLoader> scannedPlugin = pluginLoaders.get(fullName); if (scannedPlugin == null) { throw new VersionedPluginLoadingException(String.format( "Plugin %s is not part of Connect's plugin loading mechanism (ClassPath or Plugin Path)", - name + fullName )); } @@ -255,7 +247,7 @@ private void verifyClasspathVersionedPlugin(String name, Class plugin, Versio throw new VersionedPluginLoadingException(String.format( "Plugin %s has multiple versions specified in class path, " + "only one version is allowed in class path for loading a plugin with version range", - name + fullName )); } else if (classpathPlugins.isEmpty()) { throw new VersionedPluginLoadingException("Invalid plugin found in classpath"); @@ -264,7 +256,7 @@ private void verifyClasspathVersionedPlugin(String name, Class plugin, Versio if (!range.containsVersion(new DefaultArtifactVersion(pluginVersion))) { throw new VersionedPluginLoadingException(String.format( "Plugin %s has version %s which does not match the required version range %s", - name, + fullName, pluginVersion, range ), Collections.singletonList(pluginVersion)); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index b8027d71d5675..94bb65078f789 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -45,6 +45,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -265,15 +266,17 @@ public Function safeLoaderSwapper() { }; } - public String latestVersion(String classOrAlias) { - return delegatingLoader.latestVersion(classOrAlias); + public String latestVersion(String classOrAlias, PluginType... allowedTypes) { + return pluginVersion(classOrAlias, null, allowedTypes); } - public String pluginVersion(String classOrAlias, ClassLoader sourceLoader) { - if (!(sourceLoader instanceof PluginClassLoader)) { - return latestVersion(classOrAlias); + public String pluginVersion(String classOrAlias, ClassLoader sourceLoader, PluginType... allowedTypes) { + String location = (sourceLoader instanceof PluginClassLoader) ? ((PluginClassLoader) sourceLoader).location() : null; + PluginDesc desc = delegatingLoader.pluginDesc(classOrAlias, location, new HashSet<>(Arrays.asList(allowedTypes))); + if (desc != null) { + return desc.version(); } - return delegatingLoader.versionInLocation(classOrAlias, ((PluginClassLoader) sourceLoader).location()); + return null; } public DelegatingClassLoader delegatingLoader() { @@ -376,7 +379,7 @@ public Object newPlugin(String classOrAlias, VersionRange range) throws Versione public Object newPlugin(String classOrAlias, VersionRange range, ClassLoader sourceLoader) throws ClassNotFoundException { if (range == null && sourceLoader instanceof PluginClassLoader) { - sourceLoader.loadClass(classOrAlias); + return newPlugin(sourceLoader.loadClass(classOrAlias)); } return newPlugin(classOrAlias, range); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java index 8cf209ac08c91..76f28659726df 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java @@ -19,10 +19,13 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.transforms.predicates.Predicate; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -74,6 +77,14 @@ public HeaderConverterPluginVersionRecommender headerConverterPluginVersionRecom return headerConverterPluginVersionRecommender; } + public TransformationPluginRecommender transformationPluginRecommender(String classOrAlias) { + return new TransformationPluginRecommender(classOrAlias); + } + + public PredicatePluginRecommender predicatePluginRecommender(String classOrAlias) { + return new PredicatePluginRecommender(classOrAlias); + } + public class ConnectorPluginVersionRecommender implements ConfigDef.Recommender { @SuppressWarnings({"unchecked", "rawtypes"}) @@ -195,4 +206,60 @@ protected Function> recommendations() { .map(PluginDesc::version).distinct().collect(Collectors.toList()); } } + + // Recommender for transformation and predicate plugins + public abstract class SMTPluginRecommender implements ConfigDef.Recommender { + + protected abstract Function>> plugins(); + + protected final String classOrAliasConfig; + + public SMTPluginRecommender(String classOrAliasConfig) { + this.classOrAliasConfig = classOrAliasConfig; + } + + @Override + @SuppressWarnings({"rawtypes"}) + public List validValues(String name, Map parsedConfig) { + if (plugins == null) { + return Collections.emptyList(); + } + if (parsedConfig.get(classOrAliasConfig) == null) { + return Collections.emptyList(); + } + + Class classOrAlias = (Class) parsedConfig.get(classOrAliasConfig); + return plugins().apply(classOrAlias.getName()) + .stream().map(PluginDesc::version).distinct().collect(Collectors.toList()); + } + + @Override + public boolean visible(String name, Map parsedConfig) { + return true; + } + } + + public class TransformationPluginRecommender extends SMTPluginRecommender> { + + public TransformationPluginRecommender(String classOrAliasConfig) { + super(classOrAliasConfig); + } + + @Override + protected Function>>> plugins() { + return plugins::transformations; + } + } + + public class PredicatePluginRecommender extends SMTPluginRecommender> { + + public PredicatePluginRecommender(String classOrAliasConfig) { + super(classOrAliasConfig); + } + + @Override + protected Function>>> plugins() { + return plugins::predicates; + } + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index 61a69842d559b..6b67fb30c0055 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -62,6 +62,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -559,12 +560,14 @@ public void testConfigValidationTopicsRegexWithDlq() { } @Test - public void testConfigValidationTransformsExtendResults() { + @SuppressWarnings("rawtypes") + public void testConfigValidationTransformsExtendResults() throws ClassNotFoundException { final Class connectorClass = SampleSourceConnector.class; AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy); // 2 transform aliases defined -> 2 plugin lookups - when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc())); + Mockito.lenient().when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc())); + Mockito.lenient().when(plugins.newPlugin(SampleTransformation.class.getName(), null, classLoader)).thenReturn(new SampleTransformation()); // Define 2 transformations. One has a class defined and so can get embedded configs, the other is missing // class info that should generate an error. @@ -575,6 +578,7 @@ public void testConfigValidationTransformsExtendResults() { config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.type", SampleTransformation.class.getName()); config.put("required", "value"); // connector required config ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false); + assertEquals(herder.connectorType(config), ConnectorType.SOURCE); // We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on @@ -596,7 +600,7 @@ public void testConfigValidationTransformsExtendResults() { assertEquals(1, result.errorCount()); Map infos = result.values().stream() .collect(Collectors.toMap(info -> info.configKey().name(), Function.identity())); - assertEquals(31, infos.size()); + assertEquals(33, infos.size()); // Should get 2 type fields from the transforms, first adds its own config since it has a valid class assertEquals("transforms.xformA.type", infos.get("transforms.xformA.type").configValue().name()); @@ -611,12 +615,15 @@ public void testConfigValidationTransformsExtendResults() { } @Test - public void testConfigValidationPredicatesExtendResults() { + @SuppressWarnings("rawtypes") + public void testConfigValidationPredicatesExtendResults() throws ClassNotFoundException { final Class connectorClass = SampleSourceConnector.class; AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy); - when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc())); - when(plugins.predicates()).thenReturn(Collections.singleton(predicatePluginDesc())); + Mockito.lenient().when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc())); + Mockito.lenient().when(plugins.predicates()).thenReturn(Collections.singleton(predicatePluginDesc())); + Mockito.lenient().when(plugins.newPlugin(SampleTransformation.class.getName(), null, classLoader)).thenReturn(new SampleTransformation()); + Mockito.lenient().when(plugins.newPlugin(SamplePredicate.class.getName(), null, classLoader)).thenReturn(new SamplePredicate()); // Define 2 predicates. One has a class defined and so can get embedded configs, the other is missing // class info that should generate an error. @@ -653,7 +660,7 @@ public void testConfigValidationPredicatesExtendResults() { assertEquals(1, result.errorCount()); Map infos = result.values().stream() .collect(Collectors.toMap(info -> info.configKey().name(), Function.identity())); - assertEquals(33, infos.size()); + assertEquals(36, infos.size()); // Should get 2 type fields from the transforms, first adds its own config since it has a valid class assertEquals("transforms.xformA.type", infos.get("transforms.xformA.type").configValue().name()); assertTrue(infos.get("transforms.xformA.type").configValue().errors().isEmpty()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java index 6092f8ca7bdc7..a5af1d134692c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java @@ -41,6 +41,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class ConnectorConfigTest> { @@ -455,13 +457,19 @@ public static class Value> extends AbstractKeyValueTr } @Test - public void testEnrichedConfigDef() { + @SuppressWarnings("rawtypes") + public void testEnrichedConfigDef() throws ClassNotFoundException { String alias = "hdt"; String prefix = ConnectorConfig.TRANSFORMS_CONFIG + "." + alias + "."; Map props = new HashMap<>(); props.put(ConnectorConfig.TRANSFORMS_CONFIG, alias); + props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName()); props.put(prefix + "type", HasDuplicateConfigTransformation.class.getName()); - ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS, new ConfigDef(), props, false); + Plugins mockPlugins = mock(Plugins.class); + when(mockPlugins.newPlugin(HasDuplicateConfigTransformation.class.getName(), + null, (ClassLoader) null)).thenReturn(new HasDuplicateConfigTransformation()); + when(mockPlugins.transformations()).thenReturn(Collections.emptySet()); + ConfigDef def = ConnectorConfig.enrich(mockPlugins, new ConfigDef(), props, false); assertEnrichedConfigDef(def, prefix, HasDuplicateConfigTransformation.MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN); assertEnrichedConfigDef(def, prefix, TransformationStage.PREDICATE_CONFIG, ConfigDef.Type.STRING); assertEnrichedConfigDef(def, prefix, TransformationStage.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java index 2a6c0ed2b9d1d..ef7f17e1d09a2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java @@ -29,12 +29,16 @@ import org.apache.kafka.connect.transforms.SetSchemaMetadata; import org.apache.kafka.connect.transforms.TimestampConverter; import org.apache.kafka.connect.transforms.TimestampRouter; +import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.ValueToKey; import org.junit.jupiter.api.Test; import java.util.HashMap; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * Tests that transformations' configs can be composed with ConnectorConfig during its construction, ensuring no * conflicting fields or other issues. @@ -42,8 +46,19 @@ * This test appears here simply because it requires both connect-runtime and connect-transforms and connect-runtime * already depends on connect-transforms. */ +@SuppressWarnings("rawtypes") public class TransformationConfigTest { + private Plugins setupMockPlugins(Transformation transformation) { + Plugins plugins = mock(Plugins.class); + try { + when(plugins.newPlugin(transformation.getClass().getName(), null, (ClassLoader) null)).thenReturn(transformation); + } catch (ClassNotFoundException e) { + // Shouldn't happen since we're mocking the plugins + } + return plugins; + } + @Test public void testEmbeddedConfigCast() { // Validate that we can construct a Connector config containing the extended config for the transform @@ -54,7 +69,7 @@ public void testEmbeddedConfigCast() { connProps.put("transforms.example.type", Cast.Value.class.getName()); connProps.put("transforms.example.spec", "int8"); - Plugins plugins = null; // Safe when we're only constructing the config + Plugins plugins = setupMockPlugins(new Cast.Value()); new ConnectorConfig(plugins, connProps); } @@ -68,7 +83,7 @@ public void testEmbeddedConfigExtractField() { connProps.put("transforms.example.type", ExtractField.Value.class.getName()); connProps.put("transforms.example.field", "field"); - Plugins plugins = null; // Safe when we're only constructing the config + Plugins plugins = setupMockPlugins(new ExtractField.Value()); new ConnectorConfig(plugins, connProps); } @@ -81,7 +96,7 @@ public void testEmbeddedConfigFlatten() { connProps.put("transforms", "example"); connProps.put("transforms.example.type", Flatten.Value.class.getName()); - Plugins plugins = null; // Safe when we're only constructing the config + Plugins plugins = setupMockPlugins(new Flatten.Value()); new ConnectorConfig(plugins, connProps); } @@ -95,7 +110,7 @@ public void testEmbeddedConfigHoistField() { connProps.put("transforms.example.type", HoistField.Value.class.getName()); connProps.put("transforms.example.field", "field"); - Plugins plugins = null; // Safe when we're only constructing the config + Plugins plugins = setupMockPlugins(new HoistField.Value()); new ConnectorConfig(plugins, connProps); } @@ -108,7 +123,7 @@ public void testEmbeddedConfigInsertField() { connProps.put("transforms", "example"); connProps.put("transforms.example.type", InsertField.Value.class.getName()); - Plugins plugins = null; // Safe when we're only constructing the config + Plugins plugins = setupMockPlugins(new InsertField.Value()); new ConnectorConfig(plugins, connProps); } @@ -123,7 +138,7 @@ public void testEmbeddedConfigMaskField() { connProps.put("transforms.example.fields", "field"); connProps.put("transforms.example.replacement", "nothing"); - Plugins plugins = null; // Safe when we're only constructing the config + Plugins plugins = setupMockPlugins(new MaskField.Value()); new ConnectorConfig(plugins, connProps); } @@ -138,7 +153,7 @@ public void testEmbeddedConfigRegexRouter() { connProps.put("transforms.example.regex", "(.*)"); connProps.put("transforms.example.replacement", "prefix-$1"); - Plugins plugins = null; // Safe when we're only constructing the config + Plugins plugins = setupMockPlugins(new RegexRouter()); new ConnectorConfig(plugins, connProps); } @@ -151,7 +166,7 @@ public void testEmbeddedConfigReplaceField() { connProps.put("transforms", "example"); connProps.put("transforms.example.type", ReplaceField.Value.class.getName()); - Plugins plugins = null; // Safe when we're only constructing the config + Plugins plugins = setupMockPlugins(new ReplaceField.Value()); new ConnectorConfig(plugins, connProps); } @@ -164,7 +179,7 @@ public void testEmbeddedConfigSetSchemaMetadata() { connProps.put("transforms", "example"); connProps.put("transforms.example.type", SetSchemaMetadata.Value.class.getName()); - Plugins plugins = null; // Safe when we're only constructing the config + Plugins plugins = setupMockPlugins(new SetSchemaMetadata.Value()); new ConnectorConfig(plugins, connProps); } @@ -178,7 +193,7 @@ public void testEmbeddedConfigTimestampConverter() { connProps.put("transforms.example.type", TimestampConverter.Value.class.getName()); connProps.put("transforms.example.target.type", "unix"); - Plugins plugins = null; // Safe when we're only constructing the config + Plugins plugins = setupMockPlugins(new TimestampConverter.Value()); new ConnectorConfig(plugins, connProps); } @@ -191,7 +206,7 @@ public void testEmbeddedConfigTimestampRouter() { connProps.put("transforms", "example"); connProps.put("transforms.example.type", TimestampRouter.class.getName()); - Plugins plugins = null; // Safe when we're only constructing the config + Plugins plugins = setupMockPlugins(new TimestampRouter()); new ConnectorConfig(plugins, connProps); } @@ -205,7 +220,7 @@ public void testEmbeddedConfigValueToKey() { connProps.put("transforms.example.type", ValueToKey.class.getName()); connProps.put("transforms.example.fields", "field"); - Plugins plugins = null; // Safe when we're only constructing the config + Plugins plugins = setupMockPlugins(new ValueToKey()); new ConnectorConfig(plugins, connProps); }