Skip to content

Commit

Permalink
KAFKA-18182: KIP-891 Add VersionRange to Plugins and DelegatingClassL…
Browse files Browse the repository at this point in the history
…oader APIs (#16984)

Reviewers: Greg Harris <[email protected]>
  • Loading branch information
snehashisp authored Dec 7, 2024
1 parent eedd9cd commit af0054b
Show file tree
Hide file tree
Showing 7 changed files with 416 additions and 118 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3518,6 +3518,7 @@ project(':connect:runtime') {

implementation libs.slf4jApi
implementation libs.reload4j
implementation libs.slf4jReload4j
implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation
implementation libs.jacksonAnnotations
implementation libs.jacksonJaxrsJsonProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@
*/
package org.apache.kafka.connect.runtime.isolation;

import org.apache.maven.artifact.versioning.ArtifactVersion;
import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
import org.apache.maven.artifact.versioning.VersionRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URL;
import java.net.URLClassLoader;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

/**
* A custom classloader dedicated to loading Connect plugin classes in classloading isolation.
Expand Down Expand Up @@ -69,36 +75,108 @@ public DelegatingClassLoader() {

/**
* Retrieve the PluginClassLoader associated with a plugin class
*
* @param name The fully qualified class name of the plugin
* @return the PluginClassLoader that should be used to load this, or null if the plugin is not isolated.
*/
// VisibleForTesting
PluginClassLoader pluginClassLoader(String name) {
PluginClassLoader pluginClassLoader(String name, VersionRange range) {
if (!PluginUtils.shouldLoadInIsolation(name)) {
return null;
}

SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
if (inner == null) {
return null;
}
ClassLoader pluginLoader = inner.get(inner.lastKey());


ClassLoader pluginLoader = findPluginLoader(inner, name, range);
return pluginLoader instanceof PluginClassLoader
? (PluginClassLoader) pluginLoader
: null;
? (PluginClassLoader) pluginLoader
: null;
}

ClassLoader connectorLoader(String connectorClassOrAlias) {
String fullName = aliases.getOrDefault(connectorClassOrAlias, connectorClassOrAlias);
ClassLoader classLoader = pluginClassLoader(fullName);
if (classLoader == null) classLoader = this;
PluginClassLoader pluginClassLoader(String name) {
return pluginClassLoader(name, null);
}

ClassLoader loader(String classOrAlias, VersionRange range) {
String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
ClassLoader classLoader = pluginClassLoader(fullName, range);
if (classLoader == null) {
classLoader = this;
}
log.debug(
"Getting plugin class loader: '{}' for connector: {}",
classLoader,
connectorClassOrAlias
"Got plugin class loader: '{}' for connector: {}",
classLoader,
classOrAlias
);
return classLoader;
}

ClassLoader loader(String classOrAlias) {
return loader(classOrAlias, null);
}

ClassLoader connectorLoader(String connectorClassOrAlias) {
return loader(connectorClassOrAlias);
}

String resolveFullClassName(String classOrAlias) {
return aliases.getOrDefault(classOrAlias, classOrAlias);
}

String latestVersion(String classOrAlias) {
if (classOrAlias == null) {
return null;
}
String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName);
if (inner == null) {
return null;
}
return inner.lastKey().version();
}

private ClassLoader findPluginLoader(
SortedMap<PluginDesc<?>, ClassLoader> loaders,
String pluginName,
VersionRange range
) {

if (range != null) {

if (null != range.getRecommendedVersion()) {
throw new VersionedPluginLoadingException(String.format("A soft version range is not supported for plugin loading, "
+ "this is an internal error as connect should automatically convert soft ranges to hard ranges. "
+ "Provided soft version: %s ", range));
}

ArtifactVersion version = null;
ClassLoader loader = null;
for (Map.Entry<PluginDesc<?>, ClassLoader> entry : loaders.entrySet()) {
// the entries should be in sorted order of versions so this should end up picking the latest version which matches the range
if (range.containsVersion(entry.getKey().encodedVersion())) {
loader = entry.getValue();
}
}

if (loader == null) {
List<String> availableVersions = loaders.keySet().stream().map(PluginDesc::version).collect(Collectors.toList());
throw new VersionedPluginLoadingException(String.format(
"Plugin %s not found that matches the version range %s, available versions: %s",
pluginName,
range,
availableVersions
), availableVersions);
}
return loader;
}

return loaders.get(loaders.lastKey());
}

public void installDiscoveredPlugins(PluginScanResult scanResult) {
pluginLoaders.putAll(computePluginLoaders(scanResult));
for (String pluginClassName : pluginLoaders.keySet()) {
Expand All @@ -112,21 +190,72 @@ public void installDiscoveredPlugins(PluginScanResult scanResult) {

@Override
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
return loadVersionedPluginClass(name, null, resolve);
}

protected Class<?> loadVersionedPluginClass(
String name,
VersionRange range,
boolean resolve
) throws VersionedPluginLoadingException, ClassNotFoundException {

String fullName = aliases.getOrDefault(name, name);
PluginClassLoader pluginLoader = pluginClassLoader(fullName);
PluginClassLoader pluginLoader = pluginClassLoader(fullName, range);
Class<?> plugin;
if (pluginLoader != null) {
log.trace("Retrieving loaded class '{}' from '{}'", fullName, pluginLoader);
return pluginLoader.loadClass(fullName, resolve);
log.trace("Retrieving loaded class '{}' from '{}'", name, pluginLoader);
plugin = pluginLoader.loadClass(fullName, resolve);
} else {
plugin = super.loadClass(fullName, resolve);
if (range == null) {
return plugin;
}
verifyClasspathVersionedPlugin(name, plugin, range);
}
return plugin;
}

private void verifyClasspathVersionedPlugin(String name, Class<?> plugin, VersionRange range) throws VersionedPluginLoadingException {
String pluginVersion;
SortedMap<PluginDesc<?>, ClassLoader> scannedPlugin = pluginLoaders.get(name);

if (scannedPlugin == null) {
throw new VersionedPluginLoadingException(String.format(
"Plugin %s is not part of Connect's plugin loading mechanism (ClassPath or Plugin Path)",
name
));
}

return super.loadClass(fullName, resolve);
List<PluginDesc<?>> classpathPlugins = scannedPlugin.keySet().stream()
.filter(pluginDesc -> pluginDesc.location().equals("classpath"))
.collect(Collectors.toList());

if (classpathPlugins.size() > 1) {
throw new VersionedPluginLoadingException(String.format(
"Plugin %s has multiple versions specified in class path, "
+ "only one version is allowed in class path for loading a plugin with version range",
name
));
} else if (classpathPlugins.isEmpty()) {
throw new VersionedPluginLoadingException("Invalid plugin found in classpath");
} else {
pluginVersion = classpathPlugins.get(0).version();
if (!range.containsVersion(new DefaultArtifactVersion(pluginVersion))) {
throw new VersionedPluginLoadingException(String.format(
"Plugin %s has version %s which does not match the required version range %s",
name,
pluginVersion,
range
), Collections.singletonList(pluginVersion));
}
}
}

private static Map<String, SortedMap<PluginDesc<?>, ClassLoader>> computePluginLoaders(PluginScanResult plugins) {
Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders = new HashMap<>();
plugins.forEach(pluginDesc ->
pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new TreeMap<>())
.put(pluginDesc, pluginDesc.loader()));
pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new TreeMap<>())
.put(pluginDesc, pluginDesc.loader()));
return pluginLoaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime.isolation;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;

import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
Expand Down Expand Up @@ -59,6 +60,10 @@ public String toString() {
", location='" + location + '\'' +
'}';
}
@JsonIgnore
DefaultArtifactVersion encodedVersion() {
return encodedVersion;
}

public Class<? extends T> pluginClass() {
return klass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.connect.runtime.isolation;

import org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
import org.apache.maven.artifact.versioning.VersionRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -469,7 +471,7 @@ private static Collection<URL> forJavaClassPath() {
}
return distinctUrls(urls);
}

private static Collection<URL> forClassLoader(ClassLoader classLoader) {
final Collection<URL> result = new ArrayList<>();
while (classLoader != null) {
Expand All @@ -483,12 +485,28 @@ private static Collection<URL> forClassLoader(ClassLoader classLoader) {
}
return distinctUrls(result);
}

private static Collection<URL> distinctUrls(Collection<URL> urls) {
Map<String, URL> distinct = new HashMap<>(urls.size());
for (URL url : urls) {
distinct.put(url.toExternalForm(), url);
}
return distinct.values();
}
public static VersionRange connectorVersionRequirement(String version) throws InvalidVersionSpecificationException {
if (version == null || version.equals("latest")) {
return null;
}
version = version.trim();

// check first if the given version is valid
VersionRange.createFromVersionSpec(version);

// now if the version is not enclosed we consider it as a hard requirement and enclose it in []
if (!version.startsWith("[") && !version.startsWith("(")) {
version = "[" + version + "]";
}
return VersionRange.createFromVersionSpec(version);
}

}
Loading

0 comments on commit af0054b

Please sign in to comment.