Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 94 additions & 37 deletions core/trino-main/src/main/java/io/trino/connector/ConnectorManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.trino.metadata.TableProceduresRegistry;
import io.trino.metadata.TablePropertyManager;
import io.trino.security.AccessControlManager;
import io.trino.server.PluginClassLoader;
import io.trino.spi.PageIndexerFactory;
import io.trino.spi.PageSorter;
import io.trino.spi.VersionEmbedder;
Expand All @@ -49,7 +50,6 @@
import io.trino.spi.connector.ConnectorAccessControl;
import io.trino.spi.connector.ConnectorContext;
import io.trino.spi.connector.ConnectorFactory;
import io.trino.spi.connector.ConnectorHandleResolver;
import io.trino.spi.connector.ConnectorIndexProvider;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
import io.trino.spi.connector.ConnectorPageSinkProvider;
Expand Down Expand Up @@ -203,14 +203,8 @@ public synchronized void stop()
return;
}

for (Map.Entry<CatalogName, MaterializedConnector> entry : connectors.entrySet()) {
Connector connector = entry.getValue().getConnector();
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(connector.getClass().getClassLoader())) {
connector.shutdown();
}
catch (Throwable t) {
log.error(t, "Error shutting down connector: %s", entry.getKey());
}
for (MaterializedConnector connector : connectors.values()) {
connector.shutdown();
}
}

Expand Down Expand Up @@ -252,19 +246,16 @@ private synchronized CatalogName createCatalog(String catalogName, String connec
private synchronized void createCatalog(CatalogName catalogName, String connectorName, InternalConnectorFactory factory, Map<String, String> properties)
{
// create all connectors before adding, so a broken connector does not leave the system half updated
MaterializedConnector connector = new MaterializedConnector(catalogName, createConnector(catalogName, factory, properties));

Set<Class<?>> handleClasses = connector.getConnector().getHandleClasses();
if (handleClasses.isEmpty()) {
handleClasses = connector.getConnector().getHandleResolver()
.or(() -> Optional.ofNullable(factory.getConnectorFactory().getHandleResolver()))
.map(ConnectorHandleResolver::getHandleClasses)
.orElseThrow(() -> new IllegalArgumentException(format("Connector %s does not have a handle resolver", factory)));
}
CatalogClassLoaderSupplier duplicatePluginClassLoaderFactory = new CatalogClassLoaderSupplier(catalogName, factory.getDuplicatePluginClassLoaderFactory(), handleResolver);
MaterializedConnector connector = new MaterializedConnector(
catalogName,
createConnector(catalogName, factory.getConnectorFactory(), duplicatePluginClassLoaderFactory, properties),
duplicatePluginClassLoaderFactory::destroy);

MaterializedConnector informationSchemaConnector = new MaterializedConnector(
createInformationSchemaCatalogName(catalogName),
new InformationSchemaConnector(catalogName.getCatalogName(), nodeManager, metadataManager, accessControlManager));
new InformationSchemaConnector(catalogName.getCatalogName(), nodeManager, metadataManager, accessControlManager),
() -> {});

CatalogName systemId = createSystemTablesCatalogName(catalogName);
SystemTablesProvider systemTablesProvider;
Expand All @@ -283,7 +274,8 @@ private synchronized void createCatalog(CatalogName catalogName, String connecto
MaterializedConnector systemConnector = new MaterializedConnector(systemId, new SystemConnector(
nodeManager,
systemTablesProvider,
transactionId -> transactionManager.getConnectorTransaction(transactionId, catalogName)));
transactionId -> transactionManager.getConnectorTransaction(transactionId, catalogName)),
() -> {});

SecurityManagement securityManagement = connector.getAccessControl().isPresent() ? CONNECTOR : SYSTEM;

Expand All @@ -303,10 +295,8 @@ private synchronized void createCatalog(CatalogName catalogName, String connecto
addConnectorInternal(informationSchemaConnector);
addConnectorInternal(systemConnector);
catalogManager.registerCatalog(catalog);
handleResolver.addCatalogHandleClasses(catalogName.getCatalogName(), handleClasses);
}
catch (Throwable e) {
handleResolver.removeCatalogHandleClasses(catalogName.getCatalogName());
catalogManager.removeCatalog(catalog.getCatalogName());
removeConnectorInternal(systemConnector.getCatalogName());
removeConnectorInternal(informationSchemaConnector.getCatalogName());
Expand Down Expand Up @@ -367,7 +357,6 @@ public synchronized void dropConnection(String catalogName)
removeConnectorInternal(catalog);
removeConnectorInternal(createInformationSchemaCatalogName(catalog));
removeConnectorInternal(createSystemTablesCatalogName(catalog));
handleResolver.removeCatalogHandleClasses(catalogName);
});
}

Expand All @@ -391,17 +380,15 @@ private synchronized void removeConnectorInternal(CatalogName catalogName)

MaterializedConnector materializedConnector = connectors.remove(catalogName);
if (materializedConnector != null) {
Connector connector = materializedConnector.getConnector();
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(connector.getClass().getClassLoader())) {
connector.shutdown();
}
catch (Throwable t) {
log.error(t, "Error shutting down connector: %s", catalogName);
}
materializedConnector.shutdown();
}
}

private Connector createConnector(CatalogName catalogName, InternalConnectorFactory factory, Map<String, String> properties)
private Connector createConnector(
CatalogName catalogName,
ConnectorFactory connectorFactory,
Supplier<ClassLoader> duplicatePluginClassLoaderFactory,
Map<String, String> properties)
{
ConnectorContext context = new ConnectorContextInstance(
new ConnectorAwareNodeManager(nodeManager, nodeInfo.getEnvironment(), catalogName, schedulerIncludeCoordinator),
Expand All @@ -410,10 +397,10 @@ private Connector createConnector(CatalogName catalogName, InternalConnectorFact
new InternalMetadataProvider(metadataManager, typeManager),
pageSorter,
pageIndexerFactory,
factory.getDuplicatePluginClassLoaderFactory(catalogName));
duplicatePluginClassLoaderFactory);

try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(factory.getConnectorFactory().getClass().getClassLoader())) {
return factory.getConnectorFactory().create(catalogName.getCatalogName(), properties, context);
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(connectorFactory.getClass().getClassLoader())) {
return connectorFactory.create(catalogName.getCatalogName(), properties, context);
}
}

Expand All @@ -433,9 +420,9 @@ public ConnectorFactory getConnectorFactory()
return connectorFactory;
}

public Supplier<ClassLoader> getDuplicatePluginClassLoaderFactory(CatalogName catalogName)
public Function<CatalogName, ClassLoader> getDuplicatePluginClassLoaderFactory()
{
return () -> duplicatePluginClassLoaderFactory.apply(catalogName);
return duplicatePluginClassLoaderFactory;
}

@Override
Expand All @@ -445,10 +432,66 @@ public String toString()
}
}

private static class CatalogClassLoaderSupplier
implements Supplier<ClassLoader>
{
private final CatalogName catalogName;
private final Function<CatalogName, ClassLoader> duplicatePluginClassLoaderFactory;
private final HandleResolver handleResolver;

@GuardedBy("this")
private boolean destroyed;

@GuardedBy("this")
private ClassLoader classLoader;

public CatalogClassLoaderSupplier(
CatalogName catalogName,
Function<CatalogName, ClassLoader> duplicatePluginClassLoaderFactory,
HandleResolver handleResolver)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.duplicatePluginClassLoaderFactory = requireNonNull(duplicatePluginClassLoaderFactory, "duplicatePluginClassLoaderFactory is null");
this.handleResolver = requireNonNull(handleResolver, "handleResolver is null");
}

@Override
public ClassLoader get()
{
ClassLoader classLoader = duplicatePluginClassLoaderFactory.apply(catalogName);

synchronized (this) {
// we check this after class loader creation because it reduces the complexity of the synchronization, and this shouldn't happen
checkState(this.classLoader == null, "class loader is already a duplicated for catalog " + catalogName);
checkState(!destroyed, "catalog has been shutdown");
this.classLoader = classLoader;
}

if (classLoader instanceof PluginClassLoader) {
handleResolver.registerClassLoader((PluginClassLoader) classLoader);
}
return classLoader;
}

public void destroy()
{
ClassLoader classLoader;
synchronized (this) {
checkState(!destroyed, "catalog has been shutdown");
classLoader = this.classLoader;
destroyed = true;
}
if (classLoader instanceof PluginClassLoader) {
handleResolver.unregisterClassLoader((PluginClassLoader) classLoader);
}
}
}

private static class MaterializedConnector
{
private final CatalogName catalogName;
private final Connector connector;
private final Runnable afterShutdown;
private final Set<SystemTable> systemTables;
private final Set<Procedure> procedures;
private final Set<TableProcedureMetadata> tableProcedures;
Expand All @@ -466,10 +509,11 @@ private static class MaterializedConnector
private final List<PropertyMetadata<?>> columnProperties;
private final List<PropertyMetadata<?>> analyzeProperties;

public MaterializedConnector(CatalogName catalogName, Connector connector)
public MaterializedConnector(CatalogName catalogName, Connector connector, Runnable afterShutdown)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.connector = requireNonNull(connector, "connector is null");
this.afterShutdown = requireNonNull(afterShutdown, "afterShutdown is null");

Set<SystemTable> systemTables = connector.getSystemTables();
requireNonNull(systemTables, format("Connector '%s' returned a null system tables set", catalogName));
Expand Down Expand Up @@ -662,5 +706,18 @@ public List<PropertyMetadata<?>> getAnalyzeProperties()
{
return analyzeProperties;
}

public void shutdown()
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(connector.getClass().getClassLoader())) {
connector.shutdown();
}
catch (Throwable t) {
log.error(t, "Error shutting down connector: %s", catalogName);
}
finally {
afterShutdown.run();
}
}
}
}
104 changes: 32 additions & 72 deletions core/trino-main/src/main/java/io/trino/metadata/HandleResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,106 +13,66 @@
*/
package io.trino.metadata;

import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multiset;
import io.trino.connector.informationschema.InformationSchemaColumnHandle;
import io.trino.connector.informationschema.InformationSchemaSplit;
import io.trino.connector.informationschema.InformationSchemaTableHandle;
import io.trino.connector.informationschema.InformationSchemaTransactionHandle;
import io.trino.connector.system.SystemColumnHandle;
import io.trino.connector.system.SystemSplit;
import io.trino.connector.system.SystemTableHandle;
import io.trino.connector.system.SystemTransactionHandle;
import io.trino.server.PluginClassLoader;
import io.trino.split.EmptySplit;
import io.trino.split.RemoteSplit;
import io.trino.sql.planner.SystemPartitioningHandle;
import oshi.annotation.concurrent.ThreadSafe;

import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.trino.operator.ExchangeOperator.REMOTE_CONNECTOR_ID;
import static java.util.Objects.requireNonNull;

@ThreadSafe
public final class HandleResolver
{
@GuardedBy("this")
private final Map<String, Set<Class<?>>> catalogHandleClasses = new HashMap<>();
@GuardedBy("this")
private final Multiset<Class<?>> allHandleClasses = HashMultiset.create();
@GuardedBy("this")
private final BiMap<Class<?>, String> handleClassIds = HashBiMap.create();
private final Map<String, ClassLoader> classLoaders = new ConcurrentHashMap<>();

@Inject
public HandleResolver()
{
addCatalogHandleClasses(REMOTE_CONNECTOR_ID.toString(), ImmutableSet.<Class<?>>builder()
.add(RemoteSplit.class)
.add(RemoteTransactionHandle.class)
.add(SystemPartitioningHandle.class)
.build());
addCatalogHandleClasses("$system", ImmutableSet.<Class<?>>builder()
.add(SystemTableHandle.class)
.add(SystemColumnHandle.class)
.add(SystemSplit.class)
.add(SystemTransactionHandle.class)
.build());
addCatalogHandleClasses("$info_schema", ImmutableSet.<Class<?>>builder()
.add(InformationSchemaTableHandle.class)
.add(InformationSchemaColumnHandle.class)
.add(InformationSchemaSplit.class)
.add(InformationSchemaTransactionHandle.class)
.build());
addCatalogHandleClasses("$empty", ImmutableSet.<Class<?>>builder()
.add(EmptySplit.class)
.build());
classLoaders.put("system", getClass().getClassLoader());
}

public synchronized void addCatalogHandleClasses(String catalogName, Set<Class<?>> handleClasses)
public void registerClassLoader(PluginClassLoader classLoader)
{
requireNonNull(catalogName, "catalogName is null");
requireNonNull(handleClasses, "handleClasses is null");
Set<Class<?>> existing = catalogHandleClasses.putIfAbsent(catalogName, ImmutableSet.copyOf(handleClasses));
checkState(existing == null, "Catalog is already registered: %s", catalogName);
for (Class<?> handleClass : handleClasses) {
allHandleClasses.add(handleClass);
handleClassIds.put(handleClass, classId(handleClass));
}
ClassLoader existingClassLoader = classLoaders.putIfAbsent(classLoader.getId(), classLoader);
checkState(existingClassLoader == null, "Class loader already registered: %s", classLoader.getId());
}

public synchronized void removeCatalogHandleClasses(String catalogName)
public void unregisterClassLoader(PluginClassLoader classLoader)
{
Set<Class<?>> classes = catalogHandleClasses.remove(catalogName);
checkState(classes != null, "Catalog not registered: %s", catalogName);
for (Class<?> handleClass : classes) {
if (allHandleClasses.remove(handleClass, 1) == 1) {
handleClassIds.remove(handleClass);
}
}
boolean result = classLoaders.remove(classLoader.getId(), classLoader);
checkState(result, "Class loader not registered: %s", classLoader.getId());
}

public synchronized String getId(Object tableHandle)
@SuppressWarnings("MethodMayBeStatic")
public String getId(Object tableHandle)
{
Class<?> handleClass = tableHandle.getClass();
String id = handleClassIds.get(handleClass);
checkArgument(id != null, "Handle class not registered: " + handleClass.getName());
return id;
return classId(handleClass);
}

public synchronized Class<?> getHandleClass(String id)
public Class<?> getHandleClass(String id)
{
Class<?> handleClass = handleClassIds.inverse().get(id);
checkArgument(handleClass != null, "Handle ID not found: " + id);
return handleClass;
int splitPoint = id.lastIndexOf(':');
checkArgument(splitPoint > 1, "Invalid handle id: %s", id);
String classLoaderId = id.substring(0, splitPoint);
String className = id.substring(splitPoint + 1);

ClassLoader classLoader = classLoaders.get(classLoaderId);
checkArgument(classLoader != null, "Unknown handle id: %s", id);

try {
Class<?> handleClass = classLoader.loadClass(className);
if (handleClass != null) {
return handleClass;
}
}
catch (ClassNotFoundException ignored) {
}
throw new IllegalArgumentException("Handle ID not found: " + id);
}

private static String classId(Class<?> handleClass)
Expand Down
Loading