Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import io.trino.spi.connector.MetadataProvider;
import io.trino.spi.type.TypeManager;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

public class ConnectorContextInstance
Expand All @@ -35,6 +37,7 @@ public class ConnectorContextInstance
private final PageSorter pageSorter;
private final PageIndexerFactory pageIndexerFactory;
private final Supplier<ClassLoader> duplicatePluginClassLoaderFactory;
private final AtomicBoolean pluginClassLoaderDuplicated = new AtomicBoolean();

public ConnectorContextInstance(
NodeManager nodeManager,
Expand Down Expand Up @@ -93,6 +96,7 @@ public PageIndexerFactory getPageIndexerFactory()
@Override
public ClassLoader duplicatePluginClassLoader()
{
checkState(!pluginClassLoaderDuplicated.getAndSet(true), "plugin class loader already duplicated");
return duplicatePluginClassLoaderFactory.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -213,7 +214,7 @@ public synchronized void stop()
}
}

public synchronized void addConnectorFactory(ConnectorFactory connectorFactory, Supplier<ClassLoader> duplicatePluginClassLoaderFactory)
public synchronized void addConnectorFactory(ConnectorFactory connectorFactory, Function<CatalogName, ClassLoader> duplicatePluginClassLoaderFactory)
{
requireNonNull(connectorFactory, "connectorFactory is null");
requireNonNull(duplicatePluginClassLoaderFactory, "duplicatePluginClassLoaderFactory is null");
Expand Down Expand Up @@ -253,9 +254,13 @@ private synchronized void createCatalog(CatalogName catalogName, String connecto
// create all connectors before adding, so a broken connector does not leave the system half updated
MaterializedConnector connector = new MaterializedConnector(catalogName, createConnector(catalogName, factory, properties));

ConnectorHandleResolver connectorHandleResolver = connector.getConnector().getHandleResolver()
.orElseGet(factory.getConnectorFactory()::getHandleResolver);
checkArgument(connectorHandleResolver != null, "Connector %s does not have a handle resolver", factory);
Set<Class<?>> handleClasses = connector.getConnector().getHandleClasses();
if (handleClasses.isEmpty()) {
handleClasses = connector.getConnector().getHandleResolver()
.or(() -> Optional.ofNullable(factory.getConnectorFactory().getHandleResolver()))
.map(ConnectorHandleResolver::getHandleClasses)
.orElseThrow(() -> new IllegalArgumentException(format("Connector %s does not have a handle resolver", factory)));
}

MaterializedConnector informationSchemaConnector = new MaterializedConnector(
createInformationSchemaCatalogName(catalogName),
Expand Down Expand Up @@ -298,10 +303,10 @@ private synchronized void createCatalog(CatalogName catalogName, String connecto
addConnectorInternal(informationSchemaConnector);
addConnectorInternal(systemConnector);
catalogManager.registerCatalog(catalog);
handleResolver.addCatalogHandleResolver(catalogName.getCatalogName(), connectorHandleResolver);
handleResolver.addCatalogHandleClasses(catalogName.getCatalogName(), handleClasses);
}
catch (Throwable e) {
handleResolver.removeCatalogHandleResolver(catalogName.getCatalogName());
handleResolver.removeCatalogHandleClasses(catalogName.getCatalogName());
catalogManager.removeCatalog(catalog.getCatalogName());
removeConnectorInternal(systemConnector.getCatalogName());
removeConnectorInternal(informationSchemaConnector.getCatalogName());
Expand Down Expand Up @@ -362,7 +367,7 @@ public synchronized void dropConnection(String catalogName)
removeConnectorInternal(catalog);
removeConnectorInternal(createInformationSchemaCatalogName(catalog));
removeConnectorInternal(createSystemTablesCatalogName(catalog));
handleResolver.removeCatalogHandleResolver(catalogName);
handleResolver.removeCatalogHandleClasses(catalogName);
});
}

Expand Down Expand Up @@ -405,7 +410,7 @@ private Connector createConnector(CatalogName catalogName, InternalConnectorFact
new InternalMetadataProvider(metadataManager, typeManager),
pageSorter,
pageIndexerFactory,
factory.getDuplicatePluginClassLoaderFactory());
factory.getDuplicatePluginClassLoaderFactory(catalogName));

try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(factory.getConnectorFactory().getClass().getClassLoader())) {
return factory.getConnectorFactory().create(catalogName.getCatalogName(), properties, context);
Expand All @@ -415,9 +420,9 @@ private Connector createConnector(CatalogName catalogName, InternalConnectorFact
private static class InternalConnectorFactory
{
private final ConnectorFactory connectorFactory;
private final Supplier<ClassLoader> duplicatePluginClassLoaderFactory;
private final Function<CatalogName, ClassLoader> duplicatePluginClassLoaderFactory;

public InternalConnectorFactory(ConnectorFactory connectorFactory, Supplier<ClassLoader> duplicatePluginClassLoaderFactory)
public InternalConnectorFactory(ConnectorFactory connectorFactory, Function<CatalogName, ClassLoader> duplicatePluginClassLoaderFactory)
{
this.connectorFactory = connectorFactory;
this.duplicatePluginClassLoaderFactory = duplicatePluginClassLoaderFactory;
Expand All @@ -428,9 +433,9 @@ public ConnectorFactory getConnectorFactory()
return connectorFactory;
}

public Supplier<ClassLoader> getDuplicatePluginClassLoaderFactory()
public Supplier<ClassLoader> getDuplicatePluginClassLoaderFactory(CatalogName catalogName)
{
return duplicatePluginClassLoaderFactory;
return () -> duplicatePluginClassLoaderFactory.apply(catalogName);
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ public GlobalSystemConnector(Set<SystemTable> systemTables, Set<Procedure> proce
this.procedures = ImmutableSet.copyOf(requireNonNull(procedures, "procedures is null"));
}

@Override
public Set<Class<?>> getHandleClasses()
{
return ImmutableSet.<Class<?>>builder()
.add(SystemTableHandle.class)
.add(SystemColumnHandle.class)
.add(SystemSplit.class)
.add(GlobalSystemTransactionHandle.class)
.build();
}

@Override
public ConnectorTransactionHandle beginTransaction(TransactionId transactionId, IsolationLevel isolationLevel, boolean readOnly)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorContext;
import io.trino.spi.connector.ConnectorFactory;
import io.trino.spi.connector.ConnectorHandleResolver;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.procedure.Procedure;

Expand Down Expand Up @@ -47,12 +46,6 @@ public String getName()
return GlobalSystemConnector.NAME;
}

@Override
public ConnectorHandleResolver getHandleResolver()
{
return new GlobalSystemHandleResolver();
}

@Override
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private static class SystemConnectorRegistrar
@Inject
public SystemConnectorRegistrar(ConnectorManager manager, GlobalSystemConnectorFactory globalSystemConnectorFactory)
{
manager.addConnectorFactory(globalSystemConnectorFactory, globalSystemConnectorFactory.getClass()::getClassLoader);
manager.addConnectorFactory(globalSystemConnectorFactory, ignored -> globalSystemConnectorFactory.getClass().getClassLoader());
manager.createCatalog(GlobalSystemConnector.NAME, GlobalSystemConnector.NAME, ImmutableMap.of());
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package io.trino.exchange;

import io.airlift.log.Logger;
import io.trino.metadata.HandleResolver;
import io.trino.metadata.ExchangeHandleResolver;
import io.trino.spi.TrinoException;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.exchange.ExchangeManager;
Expand Down Expand Up @@ -44,14 +44,14 @@ public class ExchangeManagerRegistry
private static final File CONFIG_FILE = new File("etc/exchange-manager.properties");
private static final String EXCHANGE_MANAGER_NAME_PROPERTY = "exchange-manager.name";

private final HandleResolver handleResolver;
private final ExchangeHandleResolver handleResolver;

private final Map<String, ExchangeManagerFactory> exchangeManagerFactories = new ConcurrentHashMap<>();

private volatile ExchangeManager exchangeManager;

@Inject
public ExchangeManagerRegistry(HandleResolver handleResolver)
public ExchangeManagerRegistry(ExchangeHandleResolver handleResolver)
{
this.handleResolver = requireNonNull(handleResolver, "handleResolver is null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ public abstract class AbstractTypedJacksonModule<T>

protected AbstractTypedJacksonModule(
Class<T> baseClass,
Function<T, String> nameResolver,
Function<String, Class<? extends T>> classResolver)
Function<Object, String> nameResolver,
Function<String, Class<?>> classResolver)
{
super(baseClass.getSimpleName() + "Module", Version.unknownVersion());

TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver);
TypeIdResolver typeResolver = new InternalTypeResolver(nameResolver, classResolver);

addSerializer(baseClass, new InternalTypeSerializer<>(baseClass, typeResolver));
addDeserializer(baseClass, new InternalTypeDeserializer<>(baseClass, typeResolver));
Expand Down Expand Up @@ -133,13 +133,13 @@ private static <T> JsonSerializer<T> createSerializer(SerializerProvider provide
}
}

private static class InternalTypeResolver<T>
private static class InternalTypeResolver
extends TypeIdResolverBase
{
private final Function<T, String> nameResolver;
private final Function<String, Class<? extends T>> classResolver;
private final Function<Object, String> nameResolver;
private final Function<String, Class<?>> classResolver;

public InternalTypeResolver(Function<T, String> nameResolver, Function<String, Class<? extends T>> classResolver)
public InternalTypeResolver(Function<Object, String> nameResolver, Function<String, Class<?>> classResolver)
{
this.nameResolver = requireNonNull(nameResolver, "nameResolver is null");
this.classResolver = requireNonNull(classResolver, "classResolver is null");
Expand All @@ -151,13 +151,12 @@ public String idFromValue(Object value)
return idFromValueAndType(value, value.getClass());
}

@SuppressWarnings("unchecked")
@Override
public String idFromValueAndType(Object value, Class<?> suggestedType)
{
requireNonNull(value, "value is null");
String type = nameResolver.apply((T) value);
checkArgument(type != null, "Unknown class: %s", suggestedType.getSimpleName());
String type = nameResolver.apply(value);
checkArgument(type != null, "Unknown class: %s", value.getClass().getName());
return type;
}

Expand Down
Loading