Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
*/
package io.trino.plugin.base;

import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorContext;
import io.trino.spi.connector.ConnectorFactory;

import java.util.Optional;

Expand All @@ -32,10 +32,10 @@ private Versions() {}
* chooses not to maintain compatibility with older SPI versions, as happens for plugins maintained together with
* the Trino project.
*/
public static void checkSpiVersion(ConnectorContext context, ConnectorFactory connectorFactory)
public static void checkSpiVersion(ConnectorContext context, String connectorName, Class<? extends Connector> connectorClass)
Comment thread
findepi marked this conversation as resolved.
Outdated
{
String spiVersion = context.getSpiVersion();
Optional<String> pluginVersion = getPluginMavenVersion(connectorFactory);
Optional<String> pluginVersion = getPluginMavenVersion(connectorClass);

if (pluginVersion.isEmpty()) {
// Assume we're in tests. In tests, plugin version is unknown and SPI version may be known, e.g. when running single module's tests from maven.
Expand All @@ -47,14 +47,14 @@ public static void checkSpiVersion(ConnectorContext context, ConnectorFactory co
format(
"Trino SPI version %s does not match %s connector version %s. The connector cannot be used with SPI version other than it was compiled for.",
spiVersion,
connectorFactory.getName(),
connectorName,
pluginVersion.get()));
}

private static Optional<String> getPluginMavenVersion(ConnectorFactory connectorFactory)
private static Optional<String> getPluginMavenVersion(Class<? extends Connector> connectorClass)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why "maven" version? This is Return the version of this implementation. It consists of any string assigned by the vendor of this implementation and does not have any particular syntax specified or expected by the Java runtime.

I think this is independent of Maven and the name should be changed to not reflect the particular build system (again, this is a plugin-toolkit, expected to be used/understood by consumers who don't use "Maven").

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's Trino code. All Trino plugins use airbase and this depends on airbase's populating jar's manifest.
A non-Trino (3rd party) plugin should probably not call this method.

(BTW I agree the original change (#11774) could use a better source of information -- the SPI version a connector was compiled against -- instead of connector's version. However, i didn't find a way to do this without changing each of the connectors. From Trino project's perspective, it would be an unnecessary complexity anyway, since SPI version is the same as the connector's version)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's really easy to do (if you know how, and luckily there is prior art)

Connectors which want an SPI version match should have a Maven Transformation in their pom which writes some "attribute" to the Manifest, and, if present, Trino enforces a match.

And we write tests for the match. And we add a flag to disable the match (for CI, or testing upgrades without having to change versions, etc...).

{
String specificationVersion = connectorFactory.getClass().getPackage().getSpecificationVersion();
String implementationVersion = connectorFactory.getClass().getPackage().getImplementationVersion();
String specificationVersion = connectorClass.getPackage().getSpecificationVersion();
String implementationVersion = connectorClass.getPackage().getImplementationVersion();
if (specificationVersion == null && implementationVersion == null) {
// The information comes from jar's Manifest and is not present e.g. when running tests, or from an IDE.
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
requireNonNull(catalogName, "catalogName is null");
requireNonNull(config, "config is null");
requireNonNull(context, "context is null");
checkSpiVersion(context, this);
checkSpiVersion(context, CONNECTOR_NAME, AccumuloConnector.class);

Bootstrap app = new Bootstrap(
new JsonModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public String getName()
public Connector create(String catalogName, Map<String, String> requiredConfig, ConnectorContext context)
{
requireNonNull(requiredConfig, "requiredConfig is null");
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), AtopConnector.class);

try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
Bootstrap app = new Bootstrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public String getName()
public Connector create(String catalogName, Map<String, String> requiredConfig, ConnectorContext context)
{
requireNonNull(requiredConfig, "requiredConfig is null");
checkSpiVersion(context, this);
checkSpiVersion(context, name, JdbcConnector.class);

Bootstrap app = new Bootstrap(
binder -> binder.bind(TypeManager.class).toInstance(context.getTypeManager()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
{
requireNonNull(catalogName, "catalogName is null");
requireNonNull(config, "config is null");
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), BigQueryConnector.class);

Bootstrap app = new Bootstrap(
new JsonModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public String getName()
@Override
public Connector create(String catalogName, Map<String, String> requiredConfig, ConnectorContext context)
{
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), BlackHoleConnector.class);

ListeningScheduledExecutorService executorService = listeningDecorator(newSingleThreadScheduledExecutor(daemonThreadsNamed("blackhole")));
return new BlackHoleConnector(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public String getName()
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
requireNonNull(config, "config is null");
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), CassandraConnector.class);

Bootstrap app = new Bootstrap(
new MBeanModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public String getName()
@Override
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), DeltaLakeConnector.class);

ClassLoader classLoader = context.duplicatePluginClassLoader();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
{
requireNonNull(catalogName, "catalogName is null");
requireNonNull(config, "config is null");
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), ElasticsearchConnector.class);

Bootstrap app = new Bootstrap(
new MBeanModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public String getName()
public Connector create(String catalogName, Map<String, String> requiredConfig, ConnectorContext context)
{
requireNonNull(requiredConfig, "requiredConfig is null");
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), ExampleConnector.class);

// A plugin is not required to use Guice; it is just very convenient
Bootstrap app = new Bootstrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public String getName()
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
requireNonNull(config, "config is null");
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), SheetsConnector.class);

Bootstrap app = new Bootstrap(
new JsonModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public String getName()
@Override
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), HiveConnector.class);

ClassLoader classLoader = context.duplicatePluginClassLoader();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public String getName()
@Override
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), IcebergConnector.class);

ClassLoader classLoader = context.duplicatePluginClassLoader();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public String getName()
@Override
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), JmxConnector.class);

Bootstrap app = new Bootstrap(
new MBeanServerModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
{
requireNonNull(catalogName, "catalogName is null");
requireNonNull(config, "config is null");
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), KafkaConnector.class);

Bootstrap app = new Bootstrap(
new CatalogNameModule(catalogName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
{
requireNonNull(catalogName, "catalogName is null");
requireNonNull(config, "config is null");
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), KinesisConnector.class);

try {
Bootstrap app = new Bootstrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public String getName()
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
requireNonNull(config, "config is null");
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), KuduConnector.class);

Bootstrap app = new Bootstrap(
new JsonModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public String getName()
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
requireNonNull(config, "config is null");
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), LocalFileConnector.class);

Bootstrap app = new Bootstrap(
binder -> binder.bind(NodeManager.class).toInstance(context.getNodeManager()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public String getName()
public Connector create(String catalogName, Map<String, String> requiredConfig, ConnectorContext context)
{
requireNonNull(requiredConfig, "requiredConfig is null");
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), MemoryConnector.class);

// A plugin is not required to use Guice; it is just very convenient
Bootstrap app = new Bootstrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public String getName()
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
requireNonNull(config, "config is null");
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), MongoConnector.class);

Bootstrap app = new Bootstrap(
new JsonModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public String getName()
public Connector create(String catalogName, Map<String, String> requiredConfig, ConnectorContext context)
{
requireNonNull(requiredConfig, "requiredConfig is null");
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), PhoenixConnector.class);

try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
Bootstrap app = new Bootstrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public String getName()
public Connector create(String catalogName, Map<String, String> requiredConfig, ConnectorContext context)
{
requireNonNull(requiredConfig, "requiredConfig is null");
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), PhoenixConnector.class);

try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
Bootstrap app = new Bootstrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
{
requireNonNull(catalogName, "catalogName is null");
requireNonNull(config, "config is null");
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), PinotConnector.class);

ImmutableList.Builder<Module> modulesBuilder = ImmutableList.<Module>builder()
.add(new JsonModule())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public String getName()
public Connector create(String catalogName, Map<String, String> requiredConfig, ConnectorContext context)
{
requireNonNull(requiredConfig, "requiredConfig is null");
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), PrometheusConnector.class);

try {
// A plugin is not required to use Guice; it is just very convenient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public String getName()
@Override
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), RaptorConnector.class);

Bootstrap app = new Bootstrap(
new CatalogNameModule(catalogName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
{
requireNonNull(catalogName, "catalogName is null");
requireNonNull(config, "config is null");
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), RedisConnector.class);

Bootstrap app = new Bootstrap(
new JsonModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public String getName()
@Override
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), ThriftConnector.class);

Bootstrap app = new Bootstrap(
new MBeanModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.tpcds;

import com.google.common.collect.ImmutableMap;
import io.trino.spi.NodeManager;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorContext;
Expand Down Expand Up @@ -58,42 +59,53 @@ public String getName()
@Override
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
checkSpiVersion(context, this);
checkSpiVersion(context, getName(), TpcdsConnector.class);
return new TpcdsConnector(config, context);
}

private class TpcdsConnector
implements Connector
{
private final int splitsPerNode;
private final NodeManager nodeManager;
private final Map<String, String> config;

public TpcdsConnector(Map<String, String> config, ConnectorContext context)
{
this.splitsPerNode = getSplitsPerNode(config);
this.nodeManager = context.getNodeManager();
this.config = ImmutableMap.copyOf(config);
}

int splitsPerNode = getSplitsPerNode(config);
NodeManager nodeManager = context.getNodeManager();
return new Connector()
@Override
public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit)
{
@Override
public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit)
{
return TpcdsTransactionHandle.INSTANCE;
}

@Override
public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transactionHandle)
{
return new TpcdsMetadata();
}

@Override
public ConnectorSplitManager getSplitManager()
{
return new TpcdsSplitManager(nodeManager, splitsPerNode, isWithNoSexism(config));
}

@Override
public ConnectorRecordSetProvider getRecordSetProvider()
{
return new TpcdsRecordSetProvider();
}

@Override
public ConnectorNodePartitioningProvider getNodePartitioningProvider()
{
return new TpcdsNodePartitioningProvider(nodeManager, splitsPerNode);
}
};
return TpcdsTransactionHandle.INSTANCE;
}

@Override
public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transactionHandle)
{
return new TpcdsMetadata();
}

@Override
public ConnectorSplitManager getSplitManager()
{
return new TpcdsSplitManager(nodeManager, splitsPerNode, isWithNoSexism(config));
}

@Override
public ConnectorRecordSetProvider getRecordSetProvider()
{
return new TpcdsRecordSetProvider();
}

@Override
public ConnectorNodePartitioningProvider getNodePartitioningProvider()
{
return new TpcdsNodePartitioningProvider(nodeManager, splitsPerNode);
}
}

private int getSplitsPerNode(Map<String, String> properties)
Expand Down
Loading