Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorFactory;

import java.util.function.Function;

@ThreadSafe
public interface CatalogFactory
{
void addConnectorFactory(ConnectorFactory connectorFactory, Function<CatalogHandle, ClassLoader> duplicatePluginClassLoaderFactory);
void addConnectorFactory(ConnectorFactory connectorFactory);

CatalogConnector createCatalog(CatalogProperties catalogProperties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@
import io.trino.spi.connector.MetadataProvider;
import io.trino.spi.type.TypeManager;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

public class ConnectorContextInstance
Expand All @@ -41,8 +37,6 @@ public class ConnectorContextInstance
private final MetadataProvider metadataProvider;
private final PageSorter pageSorter;
private final PageIndexerFactory pageIndexerFactory;
private final Supplier<ClassLoader> duplicatePluginClassLoaderFactory;
private final AtomicBoolean pluginClassLoaderDuplicated = new AtomicBoolean();
private final CatalogHandle catalogHandle;

public ConnectorContextInstance(
Expand All @@ -54,8 +48,7 @@ public ConnectorContextInstance(
TypeManager typeManager,
MetadataProvider metadataProvider,
PageSorter pageSorter,
PageIndexerFactory pageIndexerFactory,
Supplier<ClassLoader> duplicatePluginClassLoaderFactory)
PageIndexerFactory pageIndexerFactory)
{
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
this.tracer = requireNonNull(tracer, "tracer is null");
Expand All @@ -65,7 +58,6 @@ public ConnectorContextInstance(
this.metadataProvider = requireNonNull(metadataProvider, "metadataProvider is null");
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
this.duplicatePluginClassLoaderFactory = requireNonNull(duplicatePluginClassLoaderFactory, "duplicatePluginClassLoaderFactory is null");
this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null");
}

Expand Down Expand Up @@ -122,11 +114,4 @@ public PageIndexerFactory getPageIndexerFactory()
{
return pageIndexerFactory;
}

@Override
public ClassLoader duplicatePluginClassLoader()
{
checkState(!pluginClassLoaderDuplicated.getAndSet(true), "plugin class loader already duplicated");
return duplicatePluginClassLoaderFactory.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public class ConnectorServices
private final Tracer tracer;
private final CatalogHandle catalogHandle;
private final Connector connector;
private final Runnable afterShutdown;
private final Set<SystemTable> systemTables;
private final CatalogProcedures procedures;
private final CatalogTableProcedures tableProcedures;
Expand All @@ -95,12 +94,11 @@ public class ConnectorServices

private final AtomicBoolean shutdown = new AtomicBoolean();

public ConnectorServices(Tracer tracer, CatalogHandle catalogHandle, Connector connector, Runnable afterShutdown)
public ConnectorServices(Tracer tracer, CatalogHandle catalogHandle, Connector connector)
{
this.tracer = requireNonNull(tracer, "tracer is null");
this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null");
this.connector = requireNonNull(connector, "connector is null");
this.afterShutdown = requireNonNull(afterShutdown, "afterShutdown is null");

Set<SystemTable> systemTables = connector.getSystemTables();
requireNonNull(systemTables, format("Connector '%s' returned a null system tables set", catalogHandle));
Expand Down Expand Up @@ -345,9 +343,6 @@ public void shutdown()
catch (Throwable t) {
log.error(t, "Error shutting down catalog: %s", catalogHandle);
}
finally {
afterShutdown.run();
}
}

private static void validateTableFunction(ConnectorTableFunction tableFunction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.connector;

import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import io.airlift.node.NodeInfo;
import io.opentelemetry.api.OpenTelemetry;
Expand All @@ -25,11 +24,9 @@
import io.trino.connector.system.SystemConnector;
import io.trino.connector.system.SystemTablesProvider;
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.metadata.HandleResolver;
import io.trino.metadata.InternalNodeManager;
import io.trino.metadata.Metadata;
import io.trino.security.AccessControl;
import io.trino.server.PluginClassLoader;
import io.trino.spi.PageIndexerFactory;
import io.trino.spi.PageSorter;
import io.trino.spi.VersionEmbedder;
Expand All @@ -46,11 +43,8 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.trino.spi.connector.CatalogHandle.createInformationSchemaCatalogHandle;
import static io.trino.spi.connector.CatalogHandle.createSystemTablesCatalogHandle;
import static java.util.Objects.requireNonNull;
Expand All @@ -61,7 +55,6 @@ public class DefaultCatalogFactory
{
private final Metadata metadata;
private final AccessControl accessControl;
private final HandleResolver handleResolver;

private final InternalNodeManager nodeManager;
private final PageSorter pageSorter;
Expand All @@ -75,13 +68,12 @@ public class DefaultCatalogFactory
private final boolean schedulerIncludeCoordinator;
private final int maxPrefetchedInformationSchemaPrefixes;

private final ConcurrentMap<ConnectorName, InternalConnectorFactory> connectorFactories = new ConcurrentHashMap<>();
private final ConcurrentMap<ConnectorName, ConnectorFactory> connectorFactories = new ConcurrentHashMap<>();

@Inject
public DefaultCatalogFactory(
Metadata metadata,
AccessControl accessControl,
HandleResolver handleResolver,
InternalNodeManager nodeManager,
PageSorter pageSorter,
PageIndexerFactory pageIndexerFactory,
Expand All @@ -95,7 +87,6 @@ public DefaultCatalogFactory(
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.accessControl = requireNonNull(accessControl, "accessControl is null");
this.handleResolver = requireNonNull(handleResolver, "handleResolver is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
Expand All @@ -109,11 +100,10 @@ public DefaultCatalogFactory(
}

@Override
public synchronized void addConnectorFactory(ConnectorFactory connectorFactory, Function<CatalogHandle, ClassLoader> duplicatePluginClassLoaderFactory)
public synchronized void addConnectorFactory(ConnectorFactory connectorFactory)
{
InternalConnectorFactory existingConnectorFactory = connectorFactories.putIfAbsent(
new ConnectorName(connectorFactory.getName()),
new InternalConnectorFactory(connectorFactory, duplicatePluginClassLoaderFactory));
ConnectorFactory existingConnectorFactory = connectorFactories.putIfAbsent(
new ConnectorName(connectorFactory.getName()), connectorFactory);
checkArgument(existingConnectorFactory == null, "Connector '%s' is already registered", connectorFactory.getName());
}

Expand All @@ -122,54 +112,43 @@ public CatalogConnector createCatalog(CatalogProperties catalogProperties)
{
requireNonNull(catalogProperties, "catalogProperties is null");

InternalConnectorFactory factory = connectorFactories.get(catalogProperties.getConnectorName());
checkArgument(factory != null, "No factory for connector '%s'. Available factories: %s", catalogProperties.getConnectorName(), connectorFactories.keySet());
ConnectorFactory connectorFactory = connectorFactories.get(catalogProperties.getConnectorName());
checkArgument(connectorFactory != null, "No factory for connector '%s'. Available factories: %s", catalogProperties.getConnectorName(), connectorFactories.keySet());

CatalogClassLoaderSupplier duplicatePluginClassLoaderFactory = new CatalogClassLoaderSupplier(
Connector connector = createConnector(
catalogProperties.getCatalogHandle().getCatalogName(),
catalogProperties.getCatalogHandle(),
factory.getDuplicatePluginClassLoaderFactory(),
handleResolver);
try {
Connector connector = createConnector(
catalogProperties.getCatalogHandle().getCatalogName(),
catalogProperties.getCatalogHandle(),
factory.getConnectorFactory(),
duplicatePluginClassLoaderFactory,
catalogProperties.getProperties());
return createCatalog(
catalogProperties.getCatalogHandle(),
catalogProperties.getConnectorName(),
connector,
duplicatePluginClassLoaderFactory::destroy,
Optional.of(catalogProperties));
}
catch (Throwable e) {
duplicatePluginClassLoaderFactory.destroy();
throw e;
}
connectorFactory,
catalogProperties.getProperties());

return createCatalog(
catalogProperties.getCatalogHandle(),
catalogProperties.getConnectorName(),
connector,
Optional.of(catalogProperties));
}

@Override
public CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorName connectorName, Connector connector)
{
return createCatalog(catalogHandle, connectorName, connector, () -> {}, Optional.empty());
return createCatalog(catalogHandle, connectorName, connector, Optional.empty());
}

private CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorName connectorName, Connector connector, Runnable destroy, Optional<CatalogProperties> catalogProperties)
private CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorName connectorName, Connector connector, Optional<CatalogProperties> catalogProperties)
{
Tracer tracer = createTracer(catalogHandle);

ConnectorServices catalogConnector = new ConnectorServices(
tracer,
catalogHandle,
connector,
destroy);
ConnectorServices catalogConnector = new ConnectorServices(tracer, catalogHandle, connector);

ConnectorServices informationSchemaConnector = new ConnectorServices(
tracer,
createInformationSchemaCatalogHandle(catalogHandle),
new InformationSchemaConnector(catalogHandle.getCatalogName(), nodeManager, metadata, accessControl, maxPrefetchedInformationSchemaPrefixes),
() -> {});
new InformationSchemaConnector(
catalogHandle.getCatalogName(),
nodeManager,
metadata,
accessControl,
maxPrefetchedInformationSchemaPrefixes));

SystemTablesProvider systemTablesProvider;
if (nodeManager.getCurrentNode().isCoordinator()) {
Expand All @@ -189,8 +168,7 @@ private CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorNam
new SystemConnector(
nodeManager,
systemTablesProvider,
transactionId -> transactionManager.getConnectorTransaction(transactionId, catalogHandle)),
() -> {});
transactionId -> transactionManager.getConnectorTransaction(transactionId, catalogHandle)));

return new CatalogConnector(
catalogHandle,
Expand All @@ -205,7 +183,6 @@ private Connector createConnector(
String catalogName,
CatalogHandle catalogHandle,
ConnectorFactory connectorFactory,
Supplier<ClassLoader> duplicatePluginClassLoaderFactory,
Map<String, String> properties)
{
ConnectorContext context = new ConnectorContextInstance(
Expand All @@ -217,8 +194,7 @@ private Connector createConnector(
typeManager,
new InternalMetadataProvider(metadata, typeManager),
pageSorter,
pageIndexerFactory,
duplicatePluginClassLoaderFactory);
pageIndexerFactory);

try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(connectorFactory.getClass().getClassLoader())) {
return connectorFactory.create(catalogName, properties, context);
Expand All @@ -229,87 +205,4 @@ private Tracer createTracer(CatalogHandle catalogHandle)
{
return openTelemetry.getTracer("trino.catalog." + catalogHandle.getCatalogName());
}

private static class InternalConnectorFactory
{
private final ConnectorFactory connectorFactory;
private final Function<CatalogHandle, ClassLoader> duplicatePluginClassLoaderFactory;

public InternalConnectorFactory(ConnectorFactory connectorFactory, Function<CatalogHandle, ClassLoader> duplicatePluginClassLoaderFactory)
{
this.connectorFactory = connectorFactory;
this.duplicatePluginClassLoaderFactory = duplicatePluginClassLoaderFactory;
}

public ConnectorFactory getConnectorFactory()
{
return connectorFactory;
}

public Function<CatalogHandle, ClassLoader> getDuplicatePluginClassLoaderFactory()
{
return duplicatePluginClassLoaderFactory;
}

@Override
public String toString()
{
return connectorFactory.getName();
}
}

private static class CatalogClassLoaderSupplier
implements Supplier<ClassLoader>
{
private final CatalogHandle catalogHandle;
private final Function<CatalogHandle, ClassLoader> duplicatePluginClassLoaderFactory;
private final HandleResolver handleResolver;

@GuardedBy("this")
private boolean destroyed;

@GuardedBy("this")
private ClassLoader classLoader;

public CatalogClassLoaderSupplier(
CatalogHandle catalogHandle,
Function<CatalogHandle, ClassLoader> duplicatePluginClassLoaderFactory,
HandleResolver handleResolver)
{
this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null");
this.duplicatePluginClassLoaderFactory = requireNonNull(duplicatePluginClassLoaderFactory, "duplicatePluginClassLoaderFactory is null");
this.handleResolver = requireNonNull(handleResolver, "handleResolver is null");
}

@Override
public ClassLoader get()
{
ClassLoader classLoader = duplicatePluginClassLoaderFactory.apply(catalogHandle);

synchronized (this) {
// we check this after class loader creation because it reduces the complexity of the synchronization, and this shouldn't happen
checkState(this.classLoader == null, "class loader is already a duplicated for catalog " + catalogHandle);
checkState(!destroyed, "catalog has been shutdown");
this.classLoader = classLoader;
}

if (classLoader instanceof PluginClassLoader) {
handleResolver.registerClassLoader((PluginClassLoader) classLoader);
}
return classLoader;
}

public void destroy()
{
ClassLoader classLoader;
synchronized (this) {
checkState(!destroyed, "catalog has been shutdown");
classLoader = this.classLoader;
destroyed = true;
}
if (classLoader instanceof PluginClassLoader) {
handleResolver.unregisterClassLoader((PluginClassLoader) classLoader);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.trino.spi.connector.ConnectorFactory;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkState;

Expand All @@ -33,10 +32,9 @@ public void setCatalogFactory(CatalogFactory catalogFactory)
}

@Override
public void addConnectorFactory(ConnectorFactory connectorFactory,
Function<CatalogHandle, ClassLoader> duplicatePluginClassLoaderFactory)
public void addConnectorFactory(ConnectorFactory connectorFactory)
{
getDelegate().addConnectorFactory(connectorFactory, duplicatePluginClassLoaderFactory);
getDelegate().addConnectorFactory(connectorFactory);
}

@Override
Expand Down
Loading