Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18215: KIP-891 Connect Multiversioning Support (Configs and Validation changes for Connectors and Converters) #17741

Merged
merged 99 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 85 commits
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
49b37c7
Add multiversioning apis to isolation
snehashisp Aug 23, 2024
1b740cc
add method for header converter
snehashisp Aug 23, 2024
3aba36e
init commit
snehashisp Aug 31, 2024
c96a8d1
update validation logic for connector versions
snehashisp Sep 19, 2024
017df62
add getter for all plugins for a class
snehashisp Sep 19, 2024
0f1cdaf
Merge branch 'multiversioning' into mvn-validation
snehashisp Sep 19, 2024
855e77a
minor edits
snehashisp Sep 21, 2024
4744f4f
Allow super to load class but do a version check
snehashisp Sep 21, 2024
c708ade
Merge branch 'multiversioning' into mvn-validation
snehashisp Sep 21, 2024
8beac31
validate the version when loaded from parent correctly
snehashisp Sep 21, 2024
df16999
Merge branch 'multiversioning' into mvn-validation
snehashisp Sep 21, 2024
6fb6471
Tested connector multiple verions
snehashisp Sep 22, 2024
84ed0da
revert changes to filestream sink
snehashisp Sep 22, 2024
814ddb7
bug in loadclass, should use fullname
snehashisp Sep 22, 2024
7905708
Merge branch 'multiversioning' into mvn-validation
snehashisp Sep 22, 2024
7497ef7
add available version to exception and minor changes
snehashisp Sep 22, 2024
27aba58
Merge branch 'multiversioning' into mvn-validation
snehashisp Sep 22, 2024
b9c0ab5
validation supporting connector version
snehashisp Oct 1, 2024
d9cad93
fix lf to cr
snehashisp Oct 1, 2024
7df37cf
Add configs and recommendors
snehashisp Oct 1, 2024
cbbbc95
converter tested
snehashisp Oct 2, 2024
5282a3f
Add converter validatoins
snehashisp Oct 2, 2024
7ad8a32
Remove changes to json
snehashisp Oct 5, 2024
2119a91
Add default version getter
snehashisp Oct 28, 2024
eb3e693
Merge branch 'multiversioning' into mvn-validation
snehashisp Oct 28, 2024
f973fd0
add a null check
snehashisp Oct 29, 2024
cce53be
Merge branch 'multiversioning' into mvn-validation
snehashisp Oct 29, 2024
baea3cf
Update log and comment
snehashisp Oct 29, 2024
5238dd0
Merge branch 'multiversioning' into mvn-validation
snehashisp Oct 29, 2024
efbe73b
adress backwards compatiblity
snehashisp Oct 30, 2024
ddec0d8
Rename default version to latest version
snehashisp Oct 30, 2024
ab0c8d8
Merge branch 'multiversioning' into mvn-validation
snehashisp Oct 30, 2024
d5f765b
Add logic for getting defaults
snehashisp Oct 30, 2024
84fb4fe
fix worker converter fetching
snehashisp Oct 31, 2024
9ded1af
fix header converter version
snehashisp Oct 31, 2024
a385dd4
Merge branch 'multiversioning' into mvn-validation
snehashisp Oct 31, 2024
d929a02
remove unwanted imports
snehashisp Oct 31, 2024
8f79bc3
Update imports
snehashisp Oct 31, 2024
9604dfe
refactor default version getting logic
snehashisp Oct 31, 2024
74a0d8f
Use current classloader when version is not present
snehashisp Oct 31, 2024
7190808
Merge branch 'multiversioning' into mvn-validation
snehashisp Oct 31, 2024
bfa73e7
Remove transformation and predicate getters
snehashisp Nov 9, 2024
0cc0f8a
Merge branch 'multiversioning' into mvn-validation
snehashisp Nov 9, 2024
fbbb0b8
add rawtype annotation
snehashisp Nov 9, 2024
bd4b665
Merge branch 'multiversioning' into mvn-validation
snehashisp Nov 9, 2024
5d7a335
remove json changes
snehashisp Nov 10, 2024
69381f3
Add version configs to worker
snehashisp Nov 10, 2024
8c334b9
Add a static loader swap method
snehashisp Nov 13, 2024
4a8f0d2
Merge branch 'multiversioning' into mvn-validation
snehashisp Nov 13, 2024
8ac4d56
Minor updates to get connectors
snehashisp Nov 24, 2024
27cb9b1
add comment for servie loading bug and rename some methods
snehashisp Nov 29, 2024
5fef3f4
Resolve comments on 1st review
snehashisp Dec 5, 2024
5e9f3c7
Remove extra code
snehashisp Dec 5, 2024
d796f22
Merge branch 'multiversioning' into mvn-validation
snehashisp Dec 5, 2024
b2b51c6
Add plugin version utils for parsing versions
snehashisp Dec 5, 2024
da2c5f5
Merge branch 'multiversioning' into mvn-validation
snehashisp Dec 5, 2024
f8e2837
Remove unused imports
snehashisp Dec 5, 2024
aff9ac1
Use PluginVersionUtils version range requirement
snehashisp Dec 5, 2024
1c35aa9
Should return loader
snehashisp Dec 5, 2024
9c1c50d
Merge branch 'multiversioning' into mvn-validation
snehashisp Dec 5, 2024
8d64cfa
address first set of comments
snehashisp Dec 5, 2024
73ecfb1
Add a delegated newPlugin class
snehashisp Dec 5, 2024
037d9af
Update abstract herder converter plugin fetching
snehashisp Dec 5, 2024
155271b
fix incorrect classloader equality check
snehashisp Dec 5, 2024
1d41648
Merge branch 'multiversioning' into mvn-validation
snehashisp Dec 5, 2024
8c88d4c
fix whitespace issue
snehashisp Dec 5, 2024
d6e3392
Update version loading logic and add return delegating loader
snehashisp Dec 5, 2024
f969159
Merge branch 'multiversioning' into mvn-validation
snehashisp Dec 5, 2024
b6e2226
Combine loader logic
snehashisp Dec 5, 2024
e05bf9e
Merge branch 'multiversioning' into mvn-validation
snehashisp Dec 5, 2024
7d995ee
Use instanced PluginVersionUtil
snehashisp Dec 5, 2024
1ac60e4
Update default fetching and recommendor logic
snehashisp Dec 6, 2024
821a7fe
Don't expose some plugins methods
snehashisp Dec 6, 2024
b3c28f7
Merge branch 'multiversioning' into mvn-validation
snehashisp Dec 6, 2024
261e697
Update suffix for verison
snehashisp Dec 6, 2024
5b41efe
add safe swap loader instead of static swaploader
snehashisp Dec 6, 2024
773d9b0
Merge branch 'multiversioning' into mvn-validation
snehashisp Dec 6, 2024
dbbc843
Merge branch 'trunk' of https://github.com/apache/kafka into mvn-vali…
snehashisp Dec 8, 2024
0e89ba4
Cleanup and changes post multiversioning merge
snehashisp Dec 8, 2024
17ceac2
fix checkstyle
snehashisp Dec 8, 2024
3e6c910
Update and Isolate getConnector logic
snehashisp Dec 8, 2024
47d4991
fix tests for herders
snehashisp Dec 8, 2024
b0aee4c
attempt to fix large diff in distributed herder
snehashisp Dec 8, 2024
e0c3b16
spotlessJavaFix
snehashisp Dec 9, 2024
9a8fc1b
updates based on comments
snehashisp Dec 9, 2024
7efacbf
remove plugins.recommendor, use header converter defaults
snehashisp Dec 9, 2024
b980c03
minor changes
snehashisp Dec 9, 2024
871ae0a
Do validations for invalid versions
snehashisp Dec 10, 2024
f7d964d
Update plugin loading logic
snehashisp Dec 10, 2024
cba6924
test fix and minor adjustments
snehashisp Dec 10, 2024
520681c
bug fixes
snehashisp Dec 11, 2024
dc4f686
revert unwanted changes
snehashisp Dec 11, 2024
18b8d16
add recommendor for invalid version, attempt to remove unwanted update
snehashisp Dec 11, 2024
978e84b
remove changes in connector config
snehashisp Dec 11, 2024
b29b373
more unwanted change removal
snehashisp Dec 11, 2024
a0f2bd0
fix test
snehashisp Dec 11, 2024
72d86aa
checkstyle fix and remove gitignore changes
snehashisp Dec 11, 2024
14cf633
remove unwanted changes
snehashisp Dec 11, 2024
357d461
newline in gitignore
snehashisp Dec 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,6 @@ storage/kafka-tiered-storage/
docker/test/report_*.html
kafka.Kafka
__pycache__
/connect/runtime/src/main/java/org/apache/kafka/connect/testing/
/connect/file/
/connect/json/
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@
<allow pkg="org.apache.kafka.connect" />
<allow pkg="io.github.classgraph"/>
<allow pkg="javax.crypto"/>
<allow pkg="org.apache.maven.artifact.versioning" />
<allow pkg="org.eclipse.jetty.util" />
<allow pkg="org.apache.log4j" />

Expand All @@ -583,7 +584,6 @@

<subpackage name="isolation">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.maven.artifact.versioning" />
<allow pkg="javax.tools" />
</subpackage>

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.connect.runtime;

import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException;

import org.apache.maven.artifact.versioning.VersionRange;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class CachedConnectors {

private static final String LATEST_VERSION = "latest";

private final Map<String, Map<String, Connector>> connectors;
private final Map<String, Exception> invalidConnectors;
private final Map<String, Map<String, Exception>> invalidVersions;
private final Plugins plugins;

public CachedConnectors(Plugins plugins) {
this.plugins = plugins;
this.connectors = new ConcurrentHashMap<>();
this.invalidConnectors = new ConcurrentHashMap<>();
this.invalidVersions = new ConcurrentHashMap<>();
}

private void validate(String connectorName, VersionRange range) throws Exception {
if (invalidConnectors.containsKey(connectorName)) {
throw new Exception(invalidConnectors.get(connectorName));
}

String version = range == null ? LATEST_VERSION : range.toString();
if (invalidVersions.containsKey(connectorName) && invalidVersions.get(connectorName).containsKey(version)) {
throw new Exception(invalidVersions.get(connectorName).get(version));
}
}

private Connector lookup(String connectorName, VersionRange range) throws Exception {
String version = range == null ? LATEST_VERSION : range.toString();
if (connectors.containsKey(connectorName) && connectors.get(connectorName).containsKey(version)) {
return connectors.get(connectorName).get(version);
}

try {
Connector connector = plugins.newConnector(connectorName, range);
connectors.computeIfAbsent(connectorName, k -> new ConcurrentHashMap<>()).put(version, connector);
return connector;
} catch (VersionedPluginLoadingException e) {
invalidVersions.computeIfAbsent(connectorName, k -> new ConcurrentHashMap<>()).put(version, e);
throw e;
} catch (Exception e) {
invalidConnectors.put(connectorName, e);
throw e;
}
}

public Connector getConnector(String connectorName, VersionRange range) throws Exception {
validate(connectorName, range);
return lookup(connectorName, range);
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;

import org.apache.maven.artifact.versioning.VersionRange;

import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -322,6 +324,8 @@ default void validateConnectorConfig(Map<String, String> connectorConfig, Callba
*/
List<ConfigKeyInfo> connectorPluginConfig(String pluginName);

List<ConfigKeyInfo> connectorPluginConfig(String pluginName, VersionRange version);

/**
* Get the current offsets for a connector.
* @param connName the name of the connector whose offsets are to be retrieved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,29 @@ public class SinkConnectorConfig extends ConnectorConfig {
"keys, all error context header keys will start with <code>__connect.errors.</code>";
private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable Error Context Headers";

static final ConfigDef CONFIG = ConnectorConfig.configDef()
.define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY)
.define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
.define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY)
.define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY)
.define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
private static ConfigDef configDef(ConfigDef baseConfigs) {
return baseConfigs
.define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY)
.define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
.define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY)
.define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY)
.define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
}

public static ConfigDef configDef() {
return CONFIG;
return configDef(ConnectorConfig.configDef());
}

public static ConfigDef enrichedConfigDef(Plugins plugins, Map<String, String> connProps, Map<String, String> workerProps) {
return configDef(ConnectorConfig.enrichedConfigDef(plugins, connProps, workerProps));
}

public static ConfigDef enrichedConfigDef(Plugins plugins, String connectorClass) {
return configDef(ConnectorConfig.enrichedConfigDef(plugins, connectorClass));
}

public SinkConnectorConfig(Plugins plugins, Map<String, String> props) {
super(plugins, CONFIG, props);
super(plugins, configDef(), props);
}

/**
Expand Down Expand Up @@ -206,6 +216,6 @@ public boolean enableErrantRecordReporter() {
}

public static void main(String[] args) {
System.out.println(CONFIG.toHtml(4, config -> "sinkconnectorconfigs_" + config));
System.out.println(configDef().toHtml(4, config -> "sinkconnectorconfigs_" + config));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ private static class EnrichedSourceConnectorConfig extends ConnectorConfig {
private final EnrichedSourceConnectorConfig enrichedSourceConfig;
private final String offsetsTopic;

public static ConfigDef configDef() {
private static ConfigDef configDef(ConfigDef baseConfigDef) {
ConfigDef.Validator atLeastZero = ConfigDef.Range.atLeast(0);
int orderInGroup = 0;
return new ConfigDef(ConnectorConfig.configDef())
return new ConfigDef(baseConfigDef)
.define(
TOPIC_CREATION_GROUPS_CONFIG,
ConfigDef.Type.LIST,
Expand Down Expand Up @@ -203,6 +203,18 @@ public static ConfigDef configDef() {
OFFSETS_TOPIC_DISPLAY);
}

public static ConfigDef configDef() {
return configDef(ConnectorConfig.configDef());
}

public static ConfigDef enrichedConfigDef(Plugins plugins, Map<String, String> connProps, Map<String, String> workerProps) {
return configDef(ConnectorConfig.enrichedConfigDef(plugins, connProps, workerProps));
}

public static ConfigDef enrichedConfigDef(Plugins plugins, String connectorClass) {
return configDef(ConnectorConfig.enrichedConfigDef(plugins, connectorClass));
}

public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) {
String defaultGroup = "default";
ConfigDef newDefaultDef = new ConfigDef(baseConfigDef);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ public void stop() {
ThreadUtils.shutdownExecutorServiceQuietly(executor, EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}

public WorkerConfig config() {
return config;
}

/**
* Start a connector managed by this worker.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,30 @@ public class WorkerConfig extends AbstractConfig {
public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
public static final String CLIENT_DNS_LOOKUP_DOC = CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC;

public static final String PLUGIN_VERSION_SUFFIX = "version";
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved

public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter";
public static final String KEY_CONVERTER_CLASS_DOC =
"Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
" This controls the format of the keys in messages written to or read from Kafka, and since this is" +
" independent of connectors it allows any connector to work with any serialization format." +
" Examples of common formats include JSON and Avro.";

public static final String KEY_CONVERTER_VERSION = "key.converter." + PLUGIN_VERSION_SUFFIX;
public static final String KEY_CONVERTER_VERSION_DEFAULT = null;
public static final String KEY_CONVERTER_VERSION_DOC = "Version of the key converter.";

public static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter";
public static final String VALUE_CONVERTER_CLASS_DOC =
"Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
" This controls the format of the values in messages written to or read from Kafka, and since this is" +
" independent of connectors it allows any connector to work with any serialization format." +
" Examples of common formats include JSON and Avro.";

public static final String VALUE_CONVERTER_VERSION = "value.converter." + PLUGIN_VERSION_SUFFIX;
public static final String VALUE_CONVERTER_VERSION_DEFAULT = null;
public static final String VALUE_CONVERTER_VERSION_DOC = "Version of the value converter.";

public static final String HEADER_CONVERTER_CLASS_CONFIG = "header.converter";
public static final String HEADER_CONVERTER_CLASS_DOC =
"HeaderConverter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
Expand All @@ -93,6 +103,10 @@ public class WorkerConfig extends AbstractConfig {
" header values to strings and deserialize them by inferring the schemas.";
public static final String HEADER_CONVERTER_CLASS_DEFAULT = SimpleHeaderConverter.class.getName();

public static final String HEADER_CONVERTER_VERSION = "header.converter." + PLUGIN_VERSION_SUFFIX;
public static final String HEADER_CONVERTER_VERSION_DEFAULT = null;
public static final String HEADER_CONVERTER_VERSION_DOC = "Version of the header converter.";

public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
= "task.shutdown.graceful.timeout.ms";
private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC =
Expand Down Expand Up @@ -200,8 +214,12 @@ protected static ConfigDef baseConfigDef() {
CLIENT_DNS_LOOKUP_DOC)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
.define(KEY_CONVERTER_VERSION, Type.STRING,
KEY_CONVERTER_VERSION_DEFAULT, Importance.LOW, KEY_CONVERTER_VERSION_DOC)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
.define(VALUE_CONVERTER_VERSION, Type.STRING,
VALUE_CONVERTER_VERSION_DEFAULT, Importance.LOW, VALUE_CONVERTER_VERSION_DOC)
.define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)
Expand Down Expand Up @@ -237,6 +255,8 @@ protected static ConfigDef baseConfigDef() {
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
HEADER_CONVERTER_CLASS_DEFAULT,
Importance.LOW, HEADER_CONVERTER_CLASS_DOC)
.define(HEADER_CONVERTER_VERSION, Type.STRING,
HEADER_CONVERTER_VERSION_DEFAULT, Importance.LOW, HEADER_CONVERTER_VERSION_DOC)
.define(CONFIG_PROVIDERS_CONFIG, Type.LIST,
Collections.emptyList(),
Importance.LOW, CONFIG_PROVIDERS_DOC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ private void verifyClasspathVersionedPlugin(String name, Class<?> plugin, Versio
name
));
} else if (classpathPlugins.isEmpty()) {

gharris1727 marked this conversation as resolved.
Show resolved Hide resolved

throw new VersionedPluginLoadingException("Invalid plugin found in classpath");
} else {
pluginVersion = classpathPlugins.get(0).version();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ public String toString() {
", location='" + location + '\'' +
'}';
}

@JsonIgnore
DefaultArtifactVersion encodedVersion() {
public DefaultArtifactVersion encodedVersion() {
return encodedVersion;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public ClassLoader connectorLoader(String connectorClassOrAlias) {
return delegatingLoader.loader(connectorClassOrAlias);
}

public ClassLoader pluginLoader(String classOrAlias, VersionRange range) throws ClassNotFoundException, VersionedPluginLoadingException {
public ClassLoader pluginLoader(String classOrAlias, VersionRange range) {
return delegatingLoader.loader(classOrAlias, range);
}

Expand All @@ -298,15 +298,15 @@ public Set<PluginDesc<SinkConnector>> sinkConnectors() {
return scanResult.sinkConnectors();
}

public Set<PluginDesc<SinkConnector>> sinkConnectors(String connectorClassOrAlias) {
Set<PluginDesc<SinkConnector>> sinkConnectors(String connectorClassOrAlias) {
return pluginsOfClass(connectorClassOrAlias, scanResult.sinkConnectors());
}

public Set<PluginDesc<SourceConnector>> sourceConnectors() {
return scanResult.sourceConnectors();
}

public Set<PluginDesc<SourceConnector>> sourceConnectors(String connectorClassOrAlias) {
Set<PluginDesc<SourceConnector>> sourceConnectors(String connectorClassOrAlias) {
return pluginsOfClass(connectorClassOrAlias, scanResult.sourceConnectors());
}

Expand Down Expand Up @@ -357,6 +357,10 @@ private <T> Set<PluginDesc<T>> pluginsOfClass(String classNameOrAlias, Set<Plugi
return plugins;
}

public PluginsRecommenders recommender() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the ROI here for this method, why can't ConnectorConfig call new?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep this as part of Plugins, then we can avoid the new PluginsRecommendors(plugins) call on every validate. Since the plugins information, once scanned remains static we don't really need to initialize all the recommenders again and again. Also, if the plugins can be improved to be mutable in future it would be easier to revoke the current instance of recommenders in plugins and generate the new recommender, and have it reflected across all the uses.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep this as part of Plugins, then we can avoid the new PluginsRecommendors(plugins) call on every validate. Since the plugins information, once scanned remains static we don't really need to initialize all the recommenders again and again.

This is premature optimization, please don't do it. When someone has a benchmark that shows the benefit of this, it is trivial to implement.

The constructor of PluginsRecommenders does not perform any substantial pre-computation.
Validation already constructs hundreds of objects (ConfigValues, etc) each time it is called.

Also, if the plugins can be improved to be mutable in future it would be easier to revoke the current instance of recommenders in plugins and generate the new recommender, and have it reflected across all the uses.

YAGNI. When/if we get around to mutable Plugins, this class will look very different.

return new PluginsRecommenders(this);
}

public Object newPlugin(String classOrAlias) throws ClassNotFoundException {
Class<?> klass = pluginClass(delegatingLoader, classOrAlias, Object.class);
return newPlugin(klass);
Expand All @@ -367,6 +371,13 @@ public Object newPlugin(String classOrAlias, VersionRange range) throws Versione
return newPlugin(klass);
}

public <T> Object newPlugin(String classOrAlias, Class<T> baseClass, VersionRange range) throws ClassNotFoundException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bad method addition:

  • The point of passing in a Class<T> is in order to return a T, to avoid a (T) cast in the caller.
  • This method returns Object so it still requires a cast in the caller
  • It does a null check on version, which is already very reasonably handled within the rest of Plugins.
  • The method calls Utils.newInstance which instantiates the plugin with the wrong TCCL

Either change the caller to use newPlugin(String, VersionRange) and perform the blind cast, or change this method to actually handle the casting/type safety and not call Utils.newInstance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to solve this in the best way. The problem I see is Utils.newInstance will try and instantiate with the current classloader, and will fetch the local converter instance, if one such is bundled in with the connector plugin. On the other hand, providing a null version in newPlugins will load the latest version of the plugin amongst all the pluginloaders. We should maintain the existing behavior which uses Utils.newInstance. Is it okay to add something like newPlugin(PluginClassloader, pluginName, version) which will load with the provided class loader, if verison is null or use delegating loader to get a versioned plugin.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just the whole CURRENT_CLASSLOADER vs PLUGINS thing again, right? Didn't we already work this out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it very similar. It requires some changes, but we can use likely the existing ones during validation but newPlugin(PluginClassloader, pluginName, version) is also useful for finding the plugin and its version while setting the defaults for the connector config.

if (range == null) {
return Utils.newInstance(classOrAlias, baseClass);
}
return newPlugin(classOrAlias, range);
}

public Connector newConnector(String connectorClassOrAlias) {
Class<? extends Connector> klass = connectorClass(connectorClassOrAlias);
return newPlugin(klass);
Expand Down
Loading
Loading