-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18215: KIP-891 Connect Multiversioning Support (Configs and Validation changes for Connectors and Converters) #17741
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@snehashisp Thanks for reworking the PluginVersionUtils. I think the current model is much more reasonable.
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
Outdated
Show resolved
Hide resolved
@@ -98,6 +118,12 @@ public class ConnectorConfig extends AbstractConfig { | |||
new InstantiableClassValidator() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The KIP includes this detail:
- If the .version property is left empty, or if there are no installed plugins with a matching name, errors will only be attributed to the plugin class property, not to the .version property.
- If a plugin class name is valid (at least one version is installed) but the non-empty .version property doesn't include any of the installed versions, then the error will be attributed to both the .version property and the plugin class property.
I don't think the validators for the class or version configs are doing this right now. Is that part of this PR or a later PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have addressed this for converters and transformations (PR # 3), but it's not that straightforward for connector and connector versions. The return on validation is a set of config Infos which is a combination of some predefined ones and ones defined in the connector. If the version is incorrect then the classloading of the connector itself will fail, so it is unclear as to which set of config infos we show. If we validate and show the config infos from the latest version of the connector, with the error attributed to the version config, the set of configs returned may not be accurate for the version requested.
One possible way would be to return the common ConnectorConfig
config def, but that is not something the API does atm, for instance we could do the same thing for invalid connector class, but we just throw an exception right now, which is why I have kept it the same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have addressed this for converters and transformations (PR # 3)
Do you mean transformations and predicates? I see that there's some validation happening there, but I didn't see anything similar for converters in this PR.
but it's not that straightforward for connector and connector versions
The KIP includes an exception for connector.class; we don't have to provide validation or recommendations for that.
We should provide validation results for connector.plugin.version, because that's going to be the single most useful configuration in this new feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Converter errors are added to the config infos here
kafka/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
Line 471 in b980c03
pluginConfigValue.addErrorMessage(e.getMessage()); |
I will see how we can return the validation result with the common connector configuration. This will have the config defs provided in ConnectorConfig
but will not have the additional ones in the subclasses or the actual connector.
Thanks for the review @gharris1727. Will get to work on the soon, in my daytime. |
.../runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
Outdated
Show resolved
Hide resolved
@@ -357,6 +357,10 @@ private <T> Set<PluginDesc<T>> pluginsOfClass(String classNameOrAlias, Set<Plugi | |||
return plugins; | |||
} | |||
|
|||
public PluginsRecommenders recommender() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we keep this as part of Plugins, then we can avoid the new PluginsRecommendors(plugins) call on every validate. Since the plugins information, once scanned remains static we don't really need to initialize all the recommenders again and again.
This is premature optimization, please don't do it. When someone has a benchmark that shows the benefit of this, it is trivial to implement.
The constructor of PluginsRecommenders does not perform any substantial pre-computation.
Validation already constructs hundreds of objects (ConfigValues, etc) each time it is called.
Also, if the plugins can be improved to be mutable in future it would be easier to revoke the current instance of recommenders in plugins and generate the new recommender, and have it reflected across all the uses.
YAGNI. When/if we get around to mutable Plugins, this class will look very different.
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSinkConnector.java
Outdated
Show resolved
Hide resolved
VersionRange range = PluginUtils.connectorVersionRequirement(connectorVersion); | ||
ClassLoader connectorLoader = plugins.pluginLoader(connectorClass, range); | ||
try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { | ||
T plugin = (T) plugins.newPlugin(pluginName, pluginClass, null); | ||
if (plugin instanceof Versioned) { | ||
return ((Versioned) plugin).version(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If two different version of the same plugin is bundled under the same plugin classloader
This is impossible from Connect's perspective. Once a ClassLoader returns a Class, it is cached and cannot be redefined. If two versions of a plugin are both present within the same ClassLoader, one will be picked arbitrarily, and it will always be picked (on the local machine). Effectively, Connect cant even know the second class file is there, which is why we don't try and detect it.
Hi @gharris1727. Please take another pass. I have made the requested changes, lmk if I have missed anything. Some tests are failing, will look into those soon. There is another unrelated issue I came across while testing this PR. There might be a regression in #16604, which seems to break reflections based isolation for plugins that are already defined in the classpath. Looks like the new classgraph scanner is always loading the classpath plugins even if there is a another version in an isolated plugin.path. This is causing the runtime to ignore the plugin due to this check. In my setup, I have built the following plugin hierarchy for testing, and I found that the different versions of json converter are never identified due to the one being present in the classpath.
I changed the debug logs to error in the class and observed the following errors during scanning.
The fix I made was to use |
Great catch! That regression hasn't been released yet and would definitely be a blocker for the release. Please open a blocker ticket for that we can fix it.
I think this is the right fix. It looks like Classgraph by default expects parent-first loaders, while our loaders are child-first. When the same class is defined multiple times, it masks all later copies, which would hide all of the other versions of the plugins. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also fix the build/tests.
@@ -1,149 +1,149 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: large diff.
How does this happen? I don't see what the change is...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was happening when I was rolling back changes via IDE. I think it somehow adding carriage returns to the lines since its on windows. Does not normally happen If I do operation with WSL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah the line endings could cause a big diff. Thanks for looking into it!
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
Show resolved
Hide resolved
Created https://issues.apache.org/jira/browse/KAFKA-18211 for the issue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @snehashisp
@@ -82,6 +87,11 @@ public class ConnectorConfig extends AbstractConfig { | |||
" or use \"FileStreamSink\" or \"FileStreamSinkConnector\" to make the configuration a bit shorter"; | |||
private static final String CONNECTOR_CLASS_DISPLAY = "Connector class"; | |||
|
|||
public static final String CONNECTOR_VERSION = "connector." + WorkerConfig.PLUGIN_VERSION_SUFFIX; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@snehashisp Thanks for this great patch. I have one small comment: It seems we are using connector.plugin.version
instead of connector.version
, correct? If so, could you please update the KIP accordingly?
…idation changes for Connectors and Converters) (apache#17741) Reviewers: Greg Harris <[email protected]>
The is one of a set of PRs for KIP-891. The list of total PRs given below all build one the previous one in the list. They can be reviewed individually, or if the complete set of changes is preferrable, please refer to the last PR.
This is PR#2 and contains changes to connector-config and connector validation endpoints to do appropriate validation considering the version provided and provide accurate defaults. This is primarily for connectors and converter PR#3 is for transformations.
Committer Checklist (excluded from commit message)