Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions client/trino-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hdfs</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hive-hadoop2</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion core/trino-server-rpm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@
<destination>/usr/lib/trino/plugin</destination>
<base>${server.tar.package}/plugin</base>
<includes>
<include>*/*</include>
<include>*/**</include>
</includes>
</rule>

Expand Down
12 changes: 12 additions & 0 deletions core/trino-server/src/main/provisio/trino.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@
<artifact id="${project.groupId}:trino-delta-lake:zip:${project.version}">
<unpack />
</artifact>
<artifact id="${project.groupId}:trino-hdfs:zip:${project.version}">
<unpack useRoot="true" />
</artifact>
</artifactSet>

<artifactSet to="plugin/druid">
Expand Down Expand Up @@ -114,6 +117,9 @@
<artifact id="${project.groupId}:trino-hive-hadoop2:zip:${project.version}">
<unpack />
</artifact>
<artifact id="${project.groupId}:trino-hdfs:zip:${project.version}">
<unpack useRoot="true" />
</artifact>
</artifactSet>

<artifactSet to="plugin/http-event-listener">
Expand All @@ -126,12 +132,18 @@
<artifact id="${project.groupId}:trino-hudi:zip:${project.version}">
<unpack />
</artifact>
<artifact id="${project.groupId}:trino-hdfs:zip:${project.version}">
<unpack useRoot="true" />
</artifact>
</artifactSet>

<artifactSet to="plugin/iceberg">
<artifact id="${project.groupId}:trino-iceberg:zip:${project.version}">
<unpack />
</artifact>
<artifact id="${project.groupId}:trino-hdfs:zip:${project.version}">
<unpack useRoot="true" />
</artifact>
</artifactSet>

<artifactSet to="plugin/ignite">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ default PageIndexerFactory getPageIndexerFactory()
throw new UnsupportedOperationException();
}

@Deprecated(forRemoval = true)
default ClassLoader duplicatePluginClassLoader()
{
throw new UnsupportedOperationException();
Expand Down
11 changes: 8 additions & 3 deletions lib/trino-filesystem-manager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>bootstrap</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>configuration</artifactId>
Expand Down Expand Up @@ -59,12 +64,12 @@

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hdfs</artifactId>
<artifactId>trino-spi</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,54 +14,62 @@
package io.trino.filesystem.manager;

import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.azure.AzureFileSystemFactory;
import io.trino.filesystem.azure.AzureFileSystemModule;
import io.trino.filesystem.gcs.GcsFileSystemFactory;
import io.trino.filesystem.gcs.GcsFileSystemModule;
import io.trino.filesystem.hdfs.HdfsFileSystemFactory;
import io.trino.filesystem.hdfs.HdfsFileSystemModule;
import io.trino.filesystem.s3.S3FileSystemFactory;
import io.trino.filesystem.s3.S3FileSystemModule;
import io.trino.filesystem.tracing.TracingFileSystemFactory;
import io.trino.hdfs.HdfsModule;
import io.trino.hdfs.authentication.HdfsAuthenticationModule;
import io.trino.hdfs.azure.HiveAzureModule;
import io.trino.hdfs.cos.HiveCosModule;
import io.trino.hdfs.gcs.HiveGcsModule;
import io.trino.hdfs.rubix.RubixEnabledConfig;
import io.trino.hdfs.rubix.RubixModule;
import io.trino.hdfs.s3.HiveS3Module;
import io.trino.spi.NodeManager;

import java.util.Map;
import java.util.Optional;

import static com.google.inject.Scopes.SINGLETON;
import static com.google.inject.multibindings.MapBinder.newMapBinder;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static java.util.Objects.requireNonNull;

public class FileSystemModule
extends AbstractConfigurationAwareModule
{
private final String catalogName;
private final NodeManager nodeManager;
private final OpenTelemetry openTelemetry;

public FileSystemModule(String catalogName, NodeManager nodeManager, OpenTelemetry openTelemetry)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
}

@Override
protected void setup(Binder binder)
{
FileSystemConfig config = buildConfigObject(FileSystemConfig.class);

binder.bind(HdfsFileSystemFactoryHolder.class).in(SINGLETON);
newOptionalBinder(binder, HdfsFileSystemLoader.class);

if (config.isHadoopEnabled()) {
install(new HdfsFileSystemModule());
install(new HdfsModule());
install(new HdfsAuthenticationModule());
install(conditionalModule(RubixEnabledConfig.class, RubixEnabledConfig::isCacheEnabled, new RubixModule()));
install(new HiveCosModule());
install(new HiveGcsModule());
HdfsFileSystemLoader loader = new HdfsFileSystemLoader(
getProperties(),
!config.isNativeAzureEnabled(),
!config.isNativeGcsEnabled(),
!config.isNativeS3Enabled(),
catalogName,
nodeManager,
openTelemetry);

loader.configure().forEach(this::consumeProperty);
binder.bind(HdfsFileSystemLoader.class).toInstance(loader);
}

var factories = newMapBinder(binder, String.class, TrinoFileSystemFactory.class);
Expand All @@ -71,48 +79,32 @@ protected void setup(Binder binder)
factories.addBinding("abfs").to(AzureFileSystemFactory.class);
factories.addBinding("abfss").to(AzureFileSystemFactory.class);
}
else if (config.isHadoopEnabled()) {
install(new HiveAzureModule());
}

if (config.isNativeS3Enabled()) {
install(new S3FileSystemModule());
factories.addBinding("s3").to(S3FileSystemFactory.class);
factories.addBinding("s3a").to(S3FileSystemFactory.class);
factories.addBinding("s3n").to(S3FileSystemFactory.class);
}
else if (config.isHadoopEnabled()) {
install(new HiveS3Module());
}

if (config.isNativeGcsEnabled()) {
install(new GcsFileSystemModule());
factories.addBinding("gs").to(GcsFileSystemFactory.class);
}
else {
install(new HiveGcsModule());
}
}

@Provides
@Singleton
public TrinoFileSystemFactory createFileSystemFactory(
HdfsFileSystemFactoryHolder hdfsFileSystemFactory,
Optional<HdfsFileSystemLoader> hdfsFileSystemLoader,
LifeCycleManager lifeCycleManager,
Map<String, TrinoFileSystemFactory> factories,
Tracer tracer)
{
TrinoFileSystemFactory delegate = new SwitchingFileSystemFactory(hdfsFileSystemFactory.value(), factories);
return new TracingFileSystemFactory(tracer, delegate);
}
Optional<TrinoFileSystemFactory> hdfsFactory = hdfsFileSystemLoader.map(HdfsFileSystemLoader::create);
hdfsFactory.ifPresent(lifeCycleManager::addInstance);

public static class HdfsFileSystemFactoryHolder
{
@Inject(optional = true)
private HdfsFileSystemFactory hdfsFileSystemFactory;

public Optional<TrinoFileSystemFactory> value()
{
return Optional.ofNullable(hdfsFileSystemFactory);
}
TrinoFileSystemFactory delegate = new SwitchingFileSystemFactory(hdfsFactory, factories);
return new TracingFileSystemFactory(tracer, delegate);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem.manager;

import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Enumeration;
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;

// based on io.trino.server.PluginClassLoader
final class HdfsClassLoader
extends URLClassLoader
{
public HdfsClassLoader(List<URL> urls)
{
// This class loader should not have access to the system (application) class loader
super(urls.toArray(URL[]::new), getPlatformClassLoader());
}

@Override
protected Class<?> loadClass(String name, boolean resolve)
throws ClassNotFoundException
{
synchronized (getClassLoadingLock(name)) {
// Check if class is in the loaded classes cache
Class<?> cachedClass = findLoadedClass(name);
if (cachedClass != null) {
return resolveClass(cachedClass, resolve);
}

// If this is an override class, only check override class loader
if (isOverrideClass(name)) {
return resolveClass(overrideClassLoader().loadClass(name), resolve);
}

// Look for class locally
return super.loadClass(name, resolve);
}
}

private Class<?> resolveClass(Class<?> clazz, boolean resolve)
{
if (resolve) {
resolveClass(clazz);
}
return clazz;
}

@Override
public URL getResource(String name)
{
// If this is an override resource, only check override class loader
if (isOverrideResource(name)) {
return overrideClassLoader().getResource(name);
}

// Look for resource locally
return super.getResource(name);
}

@Override
public Enumeration<URL> getResources(String name)
throws IOException
{
// If this is an override resource, use override resources
if (isOverrideResource(name)) {
return overrideClassLoader().getResources(name);
}

// Use local resources
return super.getResources(name);
}

private ClassLoader overrideClassLoader()
{
return getClass().getClassLoader();
}

private static boolean isOverrideResource(String name)
{
return isOverrideClass(name.replace('.', '/'));
}

private static boolean isOverrideClass(String name)
{
// SPI packages from io.trino.server.PluginManager and dependencies of trino-filesystem
return hasPackage(name, "io.trino.spi.") ||
hasPackage(name, "com.fasterxml.jackson.annotation.") ||
hasPackage(name, "io.airlift.slice.") ||
hasPackage(name, "org.openjdk.jol.") ||
hasPackage(name, "io.opentelemetry.api.") ||
hasPackage(name, "io.opentelemetry.context.") ||
hasPackage(name, "com.google.common.") ||
hasExactPackage(name, "io.trino.memory.context.") ||
hasExactPackage(name, "io.trino.filesystem.");
}

private static boolean hasPackage(String name, String packageName)
{
checkArgument(!packageName.isEmpty() && packageName.charAt(packageName.length() - 1) == '.');
return name.startsWith(packageName);
}

private static boolean hasExactPackage(String name, String packageName)
{
checkArgument(!packageName.isEmpty() && packageName.charAt(packageName.length() - 1) == '.');
return name.startsWith(packageName) && (name.lastIndexOf('.') == (packageName.length() - 1));
}
}
Loading