-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18182: KIP-891 Connect Multiversion Support (Base PR with Plugin Loading Isolation Changes) #16984
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks so much for the PR @snehashisp.
I apologize for the lateness of my review. I'm planning on reviewing this daily until the feature freeze on the 11th.
plugin = super.loadClass(fullName, resolve); | ||
// if we are loading a plugin class from the parent classloader, we need to check if the version | ||
// matches the range | ||
String pluginVersion; | ||
try (LoaderSwap classLoader = PluginScanner.withClassLoader(plugin.getClassLoader())) { | ||
pluginVersion = PluginScanner.versionFor(plugin.getDeclaredConstructor().newInstance()); | ||
} catch (ReflectiveOperationException | LinkageError e) { | ||
throw new VersionedPluginLoadingException(String.format( | ||
"Plugin %s was loaded with %s but failed to determine its version", | ||
name, | ||
plugin.getClassLoader() | ||
), e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This information should be computed ahead-of-time by the PluginScanner, not re-computed at runtime each time the class is requested. The parent loader of the DelegatingClassLoader will always be considered "classpath" in PluginDesc<> objects, which also include the version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this part not really required? IIUC the plugins present in classpath are also scanned and part of the pluginLoaders
map, and it should be found with the pluginClassLoader
logic. super.loadClass
should always throw a ClassNotFoundException
. Otherwise, if there are edge cases where super.loadClass does find the class (maybe the plugin is part of the Bootstrap loader) then instantiation the class here is the only way to confirm version requirements are addressed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC the plugins present in classpath are also scanned and part of the pluginLoaders map, and it should be found with the pluginClassLoader logic.
pluginClassLoader()
returns PluginClassLoader
, and has an instanceof check to filter out the classpath plugins.
super.loadClass should always throw a ClassNotFoundException.
I don't know what you mean. If there's a copy of the requested plugin on the classpath, super.loadClass will find it, and it will already have been scanned and put in pluginLoaders with the location being "classpath"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, I missed the instance of check in the pluginClassLoader
. Yes in that case super.loadClass will load the plugin from classpath and we can avoid instantiation to do the version check. Will make the requested changes for this and the following comment.
String fullName = aliases.getOrDefault(connectorClassOrAlias, connectorClassOrAlias); | ||
// if the plugin is not loaded via the plugin classloader, it might still be available in the parent delegating | ||
// classloader, in order to check if the version satisfies the requirement we need to load the plugin class here | ||
ClassLoader classLoader = loadVersionedPluginClass(fullName, range, false).getClassLoader(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a slight difference here: If the plugin is non-isolated, DelegatingClassLoader#connectorLoader(String)
returns this
(the delegating loader), but DelegatingClassLoader#connectorLoader(String, VersionRange)
returns parent
(the classpath).
For a similar reason, connectorLoader(String)
doesn't throw ClassNotFoundException
, it returns this
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar query here (w.r.t my previous comment). Since the class path plugins are scanned and added to pluginLoaders dict, both should return the classpath loader. Its only if for some reason, a plugin is found that was not scanned we should get a different in the two methods.
I also saw the other comment in the other PR. I have actually replaced the use of connector loader with fetching the connector and using getClass().getClassLoader()
as that avoids two calls. Now in both cases it should return classpath loader for non-isolated plugins based on my understanding. This would mean that instantiation plugins with these classloaders will bypass the delegating loader, which does not look correct, but that seems to be the current process. LMK if I am missing something here. I can check if the class loader here is an instance of PluginClassLoader
and return the delegating classloader here if I am incorrect in my assumption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but that seems to be the current process.
It's the process in your PR, but was not the process before. I'm saying you need to change this to return the DelegatingClassLoader.
.../runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
Show resolved
Hide resolved
.../runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
Outdated
Show resolved
Hide resolved
.../runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
Show resolved
Hide resolved
if (range != null && range.hasRestrictions() && !range.containsVersion(pluginVersion)) { | ||
// this can happen if the current class loader is used | ||
// if there are version restrictions then this should be captured, and we should load using the plugin class loader | ||
if (classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) { | ||
return getVersionedPlugin(config, classPropertyName, versionPropertyName, basePluginClass, ClassLoaderUsage.PLUGINS, availablePlugins); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we assert range == null || classLoaderUsage == PLUGINS
earlier, to ensure that we never get into this situation where we loaded a class of an invalid version?
I think having this depend on range.containsVersion(pluginVersion) could lead to some strange behavior in the following situation:
- key.converter.version = [1.0, 2.0)
- Converter v1.0 is on the classpath
- Converter v1.2 is in a different plugin
- Converter v1.1 is in the local plugin
If you don't have 1.1 installed, 1.2 is selected. If you later install 1.1 (an older version), you get rolled back to 1.1.
But if you had specified [1.2] it wouldn't get rolled back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the situation you mentioned is exactly the reason why the public methods to create a converter with a version provided we don't have the option to pass in a class loader and defaults to using the plugins loader, otherwise it can potentially shadow a higher version of a plugin requirement. This code path should ideally never be executed. Will remove this and add the assertion suggestion.
.../runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginLoadingException.java
Outdated
Show resolved
Hide resolved
Thanks for the reviews @gharris1727. Will take a look soon and try and give my full attention till 11th. What does the feature freeze encompass, does it include all unit testing and ITs or can it come later as part of stabilization? I believe we can make it if it does not include the latter, otherwise we will need to target the following release. Edit: I just realized that I have other commitments next week as well. I'll try my best, but 11th seems a bit ambitious given the scope of work here. |
Earlier is always better, but we can prioritize the changes (with manual testing) before feature freeze, and unit/integration tests after feature freeze.
I'll do my best to support you, please let me know if you see anything that I can help with, or if we need to involve someone else. |
ccfb90b
to
757caba
Compare
A couple of minor adjustments were needed, otherwise the runtime tests have all passed locally. Hoping that it will go through on the CI 🤞. I accidentally pushed in some changes to rest of the files while merging latest trunk (git merge and push from native windoes seems to add a carriage return to the files 😢). If possible can you please remove the additional tags. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks so much @snehashisp !
The is one of a set of PRs for KIP-891. The list of total PRs given below all build one the previous one in the list. They can be reviewed individually, or if the complete set of changes is preferrable, please refer to the last PR.
This is PR#1 and contains changes to connects plugin loading package. It contains the set of changes to load versioned plugins based on maven versioning as described in the KIP and exposes a set of public methods for use in the other components of this KIP.
Committer Checklist (excluded from commit message)