Skip to content

Commit

Permalink
KAFKA-18215: KIP-891 Connect Multiversioning Support (Configs and Val…
Browse files Browse the repository at this point in the history
…idation changes for Connectors and Converters) (apache#17741)

Reviewers: Greg Harris <[email protected]>
  • Loading branch information
snehashisp authored and peterxcli committed Dec 18, 2024
1 parent 5b58a3f commit fd6ccdb
Show file tree
Hide file tree
Showing 20 changed files with 916 additions and 266 deletions.
2 changes: 1 addition & 1 deletion checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@
<allow pkg="org.apache.kafka.connect" />
<allow pkg="io.github.classgraph"/>
<allow pkg="javax.crypto"/>
<allow pkg="org.apache.maven.artifact.versioning" />
<allow pkg="org.eclipse.jetty.util" />
<allow pkg="org.apache.log4j" />

Expand All @@ -583,7 +584,6 @@

<subpackage name="isolation">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.maven.artifact.versioning" />
<allow pkg="javax.tools" />
</subpackage>

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.connect.runtime;

import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException;

import org.apache.maven.artifact.versioning.VersionRange;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class CachedConnectors {

private static final String LATEST_VERSION = "latest";

private final Map<String, Map<String, Connector>> connectors;
private final Map<String, Exception> invalidConnectors;
private final Map<String, Map<String, Exception>> invalidVersions;
private final Plugins plugins;

public CachedConnectors(Plugins plugins) {
this.plugins = plugins;
this.connectors = new ConcurrentHashMap<>();
this.invalidConnectors = new ConcurrentHashMap<>();
this.invalidVersions = new ConcurrentHashMap<>();
}

private void validate(String connectorName, VersionRange range) throws Exception {
if (invalidConnectors.containsKey(connectorName)) {
throw new Exception(invalidConnectors.get(connectorName));
}

String version = range == null ? LATEST_VERSION : range.toString();
if (invalidVersions.containsKey(connectorName) && invalidVersions.get(connectorName).containsKey(version)) {
throw new Exception(invalidVersions.get(connectorName).get(version));
}
}

private Connector lookup(String connectorName, VersionRange range) throws Exception {
String version = range == null ? LATEST_VERSION : range.toString();
if (connectors.containsKey(connectorName) && connectors.get(connectorName).containsKey(version)) {
return connectors.get(connectorName).get(version);
}

try {
Connector connector = plugins.newConnector(connectorName, range);
connectors.computeIfAbsent(connectorName, k -> new ConcurrentHashMap<>()).put(version, connector);
return connector;
} catch (VersionedPluginLoadingException e) {
invalidVersions.computeIfAbsent(connectorName, k -> new ConcurrentHashMap<>()).put(version, e);
throw e;
} catch (Exception e) {
invalidConnectors.put(connectorName, e);
throw e;
}
}

public Connector getConnector(String connectorName, VersionRange range) throws Exception {
validate(connectorName, range);
return lookup(connectorName, range);
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;

import org.apache.maven.artifact.versioning.VersionRange;

import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -322,6 +324,8 @@ default void validateConnectorConfig(Map<String, String> connectorConfig, Callba
*/
List<ConfigKeyInfo> connectorPluginConfig(String pluginName);

List<ConfigKeyInfo> connectorPluginConfig(String pluginName, VersionRange version);

/**
* Get the current offsets for a connector.
* @param connName the name of the connector whose offsets are to be retrieved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,29 @@ public class SinkConnectorConfig extends ConnectorConfig {
"keys, all error context header keys will start with <code>__connect.errors.</code>";
private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable Error Context Headers";

static final ConfigDef CONFIG = ConnectorConfig.configDef()
.define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY)
.define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
.define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY)
.define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY)
.define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
private static ConfigDef configDef(ConfigDef baseConfigs) {
return baseConfigs
.define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY)
.define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
.define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY)
.define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY)
.define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
}

public static ConfigDef configDef() {
return CONFIG;
return configDef(ConnectorConfig.configDef());
}

public static ConfigDef enrichedConfigDef(Plugins plugins, Map<String, String> connProps, WorkerConfig workerConfig) {
return configDef(ConnectorConfig.enrichedConfigDef(plugins, connProps, workerConfig));
}

public static ConfigDef enrichedConfigDef(Plugins plugins, String connectorClass) {
return configDef(ConnectorConfig.enrichedConfigDef(plugins, connectorClass));
}

public SinkConnectorConfig(Plugins plugins, Map<String, String> props) {
super(plugins, CONFIG, props);
super(plugins, configDef(), props);
}

/**
Expand Down Expand Up @@ -206,6 +216,6 @@ public boolean enableErrantRecordReporter() {
}

public static void main(String[] args) {
System.out.println(CONFIG.toHtml(4, config -> "sinkconnectorconfigs_" + config));
System.out.println(configDef().toHtml(4, config -> "sinkconnectorconfigs_" + config));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ private static class EnrichedSourceConnectorConfig extends ConnectorConfig {
private final EnrichedSourceConnectorConfig enrichedSourceConfig;
private final String offsetsTopic;

public static ConfigDef configDef() {
private static ConfigDef configDef(ConfigDef baseConfigDef) {
ConfigDef.Validator atLeastZero = ConfigDef.Range.atLeast(0);
int orderInGroup = 0;
return new ConfigDef(ConnectorConfig.configDef())
return new ConfigDef(baseConfigDef)
.define(
TOPIC_CREATION_GROUPS_CONFIG,
ConfigDef.Type.LIST,
Expand Down Expand Up @@ -203,6 +203,18 @@ public static ConfigDef configDef() {
OFFSETS_TOPIC_DISPLAY);
}

public static ConfigDef configDef() {
return configDef(ConnectorConfig.configDef());
}

public static ConfigDef enrichedConfigDef(Plugins plugins, Map<String, String> connProps, WorkerConfig workerConfig) {
return configDef(ConnectorConfig.enrichedConfigDef(plugins, connProps, workerConfig));
}

public static ConfigDef enrichedConfigDef(Plugins plugins, String connectorClass) {
return configDef(ConnectorConfig.enrichedConfigDef(plugins, connectorClass));
}

public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) {
String defaultGroup = "default";
ConfigDef newDefaultDef = new ConfigDef(baseConfigDef);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ public void stop() {
ThreadUtils.shutdownExecutorServiceQuietly(executor, EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}

public WorkerConfig config() {
return config;
}

/**
* Start a connector managed by this worker.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,30 @@ public class WorkerConfig extends AbstractConfig {
public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
public static final String CLIENT_DNS_LOOKUP_DOC = CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC;

public static final String PLUGIN_VERSION_SUFFIX = "plugin.version";

public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter";
public static final String KEY_CONVERTER_CLASS_DOC =
"Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
" This controls the format of the keys in messages written to or read from Kafka, and since this is" +
" independent of connectors it allows any connector to work with any serialization format." +
" Examples of common formats include JSON and Avro.";

public static final String KEY_CONVERTER_VERSION = "key.converter." + PLUGIN_VERSION_SUFFIX;
public static final String KEY_CONVERTER_VERSION_DEFAULT = null;
public static final String KEY_CONVERTER_VERSION_DOC = "Version of the key converter.";

public static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter";
public static final String VALUE_CONVERTER_CLASS_DOC =
"Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
" This controls the format of the values in messages written to or read from Kafka, and since this is" +
" independent of connectors it allows any connector to work with any serialization format." +
" Examples of common formats include JSON and Avro.";

public static final String VALUE_CONVERTER_VERSION = "value.converter." + PLUGIN_VERSION_SUFFIX;
public static final String VALUE_CONVERTER_VERSION_DEFAULT = null;
public static final String VALUE_CONVERTER_VERSION_DOC = "Version of the value converter.";

public static final String HEADER_CONVERTER_CLASS_CONFIG = "header.converter";
public static final String HEADER_CONVERTER_CLASS_DOC =
"HeaderConverter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
Expand All @@ -93,6 +103,10 @@ public class WorkerConfig extends AbstractConfig {
" header values to strings and deserialize them by inferring the schemas.";
public static final String HEADER_CONVERTER_CLASS_DEFAULT = SimpleHeaderConverter.class.getName();

public static final String HEADER_CONVERTER_VERSION = "header.converter." + PLUGIN_VERSION_SUFFIX;
public static final String HEADER_CONVERTER_VERSION_DEFAULT = null;
public static final String HEADER_CONVERTER_VERSION_DOC = "Version of the header converter.";

public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
= "task.shutdown.graceful.timeout.ms";
private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC =
Expand Down Expand Up @@ -200,8 +214,12 @@ protected static ConfigDef baseConfigDef() {
CLIENT_DNS_LOOKUP_DOC)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
.define(KEY_CONVERTER_VERSION, Type.STRING,
KEY_CONVERTER_VERSION_DEFAULT, Importance.LOW, KEY_CONVERTER_VERSION_DOC)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
.define(VALUE_CONVERTER_VERSION, Type.STRING,
VALUE_CONVERTER_VERSION_DEFAULT, Importance.LOW, VALUE_CONVERTER_VERSION_DOC)
.define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)
Expand Down Expand Up @@ -237,6 +255,8 @@ protected static ConfigDef baseConfigDef() {
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
HEADER_CONVERTER_CLASS_DEFAULT,
Importance.LOW, HEADER_CONVERTER_CLASS_DOC)
.define(HEADER_CONVERTER_VERSION, Type.STRING,
HEADER_CONVERTER_VERSION_DEFAULT, Importance.LOW, HEADER_CONVERTER_VERSION_DOC)
.define(CONFIG_PROVIDERS_CONFIG, Type.LIST,
Collections.emptyList(),
Importance.LOW, CONFIG_PROVIDERS_DOC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,23 @@ String latestVersion(String classOrAlias) {
return inner.lastKey().version();
}

String versionInLocation(String classOrAlias, String location) {
if (classOrAlias == null) {
return null;
}
String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName);
if (inner == null) {
return null;
}
for (Map.Entry<PluginDesc<?>, ClassLoader> entry : inner.entrySet()) {
if (entry.getKey().location().equals(location)) {
return entry.getKey().version();
}
}
return null;
}

private ClassLoader findPluginLoader(
SortedMap<PluginDesc<?>, ClassLoader> loaders,
String pluginName,
Expand Down Expand Up @@ -226,8 +243,12 @@ private void verifyClasspathVersionedPlugin(String name, Class<?> plugin, Versio
));
}

List<PluginDesc<?>> classpathPlugins = scannedPlugin.keySet().stream()
// if a plugin implements two interfaces (like JsonConverter implements both converter and header converter)
// it will have two entries under classpath, one for each scan. Hence, we count distinct by version.
List<String> classpathPlugins = scannedPlugin.keySet().stream()
.filter(pluginDesc -> pluginDesc.location().equals("classpath"))
.map(PluginDesc::version)
.distinct()
.collect(Collectors.toList());

if (classpathPlugins.size() > 1) {
Expand All @@ -239,7 +260,7 @@ private void verifyClasspathVersionedPlugin(String name, Class<?> plugin, Versio
} else if (classpathPlugins.isEmpty()) {
throw new VersionedPluginLoadingException("Invalid plugin found in classpath");
} else {
pluginVersion = classpathPlugins.get(0).version();
pluginVersion = classpathPlugins.get(0);
if (!range.containsVersion(new DefaultArtifactVersion(pluginVersion))) {
throw new VersionedPluginLoadingException(String.format(
"Plugin %s has version %s which does not match the required version range %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ public String toString() {
", location='" + location + '\'' +
'}';
}

@JsonIgnore
DefaultArtifactVersion encodedVersion() {
public DefaultArtifactVersion encodedVersion() {
return encodedVersion;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,20 +493,21 @@ private static Collection<URL> distinctUrls(Collection<URL> urls) {
}
return distinct.values();
}

public static VersionRange connectorVersionRequirement(String version) throws InvalidVersionSpecificationException {
if (version == null || version.equals("latest")) {
return null;
}
version = version.trim();

// check first if the given version is valid
VersionRange.createFromVersionSpec(version);
VersionRange range = VersionRange.createFromVersionSpec(version);

// now if the version is not enclosed we consider it as a hard requirement and enclose it in []
if (!version.startsWith("[") && !version.startsWith("(")) {
version = "[" + version + "]";
if (range.hasRestrictions()) {
return range;
}
// now if the version is not enclosed we consider it as a hard requirement and enclose it in []
version = "[" + version + "]";
return VersionRange.createFromVersionSpec(version);
}

}
Loading

0 comments on commit fd6ccdb

Please sign in to comment.