diff --git a/client/trino-jdbc/pom.xml b/client/trino-jdbc/pom.xml index e1a8bb5a67d6..1fb6ab0bc8c5 100644 --- a/client/trino-jdbc/pom.xml +++ b/client/trino-jdbc/pom.xml @@ -186,6 +186,12 @@ test + + io.trino + trino-hdfs + test + + io.trino trino-hive-hadoop2 diff --git a/core/trino-server-rpm/pom.xml b/core/trino-server-rpm/pom.xml index 76a59fe85be2..5871b37d5e6f 100644 --- a/core/trino-server-rpm/pom.xml +++ b/core/trino-server-rpm/pom.xml @@ -252,7 +252,7 @@ /usr/lib/trino/plugin ${server.tar.package}/plugin - */* + */** diff --git a/core/trino-server/src/main/provisio/trino.xml b/core/trino-server/src/main/provisio/trino.xml index acc6fd6005db..519af7a4e530 100644 --- a/core/trino-server/src/main/provisio/trino.xml +++ b/core/trino-server/src/main/provisio/trino.xml @@ -66,6 +66,9 @@ + + + @@ -114,6 +117,9 @@ + + + @@ -126,12 +132,18 @@ + + + + + + diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorContext.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorContext.java index 07f8a7f4742d..db8c9d653b98 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorContext.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorContext.java @@ -79,6 +79,7 @@ default PageIndexerFactory getPageIndexerFactory() throw new UnsupportedOperationException(); } + @Deprecated(forRemoval = true) default ClassLoader duplicatePluginClassLoader() { throw new UnsupportedOperationException(); diff --git a/lib/trino-filesystem-manager/pom.xml b/lib/trino-filesystem-manager/pom.xml index 8b86115c9acf..b82ba37f2891 100644 --- a/lib/trino-filesystem-manager/pom.xml +++ b/lib/trino-filesystem-manager/pom.xml @@ -27,6 +27,11 @@ guice + + io.airlift + bootstrap + + io.airlift configuration @@ -59,12 +64,12 @@ io.trino - trino-hdfs + trino-spi - io.trino - trino-spi + jakarta.annotation + jakarta.annotation-api diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java index 535067fb74f3..57cd749bb05e 100644 --- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java +++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java @@ -14,54 +14,62 @@ package io.trino.filesystem.manager; import com.google.inject.Binder; -import com.google.inject.Inject; import com.google.inject.Provides; import com.google.inject.Singleton; +import io.airlift.bootstrap.LifeCycleManager; import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Tracer; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.azure.AzureFileSystemFactory; import io.trino.filesystem.azure.AzureFileSystemModule; import io.trino.filesystem.gcs.GcsFileSystemFactory; import io.trino.filesystem.gcs.GcsFileSystemModule; -import io.trino.filesystem.hdfs.HdfsFileSystemFactory; -import io.trino.filesystem.hdfs.HdfsFileSystemModule; import io.trino.filesystem.s3.S3FileSystemFactory; import io.trino.filesystem.s3.S3FileSystemModule; import io.trino.filesystem.tracing.TracingFileSystemFactory; -import io.trino.hdfs.HdfsModule; -import io.trino.hdfs.authentication.HdfsAuthenticationModule; -import io.trino.hdfs.azure.HiveAzureModule; -import io.trino.hdfs.cos.HiveCosModule; -import io.trino.hdfs.gcs.HiveGcsModule; -import io.trino.hdfs.rubix.RubixEnabledConfig; -import io.trino.hdfs.rubix.RubixModule; -import io.trino.hdfs.s3.HiveS3Module; +import io.trino.spi.NodeManager; import java.util.Map; import java.util.Optional; -import static com.google.inject.Scopes.SINGLETON; import static com.google.inject.multibindings.MapBinder.newMapBinder; -import static io.airlift.configuration.ConditionalModule.conditionalModule; +import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; +import static java.util.Objects.requireNonNull; public class FileSystemModule extends AbstractConfigurationAwareModule { + private final String catalogName; + private final NodeManager nodeManager; + private final OpenTelemetry openTelemetry; + + public FileSystemModule(String catalogName, NodeManager nodeManager, OpenTelemetry openTelemetry) + { + this.catalogName = requireNonNull(catalogName, "catalogName is null"); + this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); + this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null"); + } + @Override protected void setup(Binder binder) { FileSystemConfig config = buildConfigObject(FileSystemConfig.class); - binder.bind(HdfsFileSystemFactoryHolder.class).in(SINGLETON); + newOptionalBinder(binder, HdfsFileSystemLoader.class); if (config.isHadoopEnabled()) { - install(new HdfsFileSystemModule()); - install(new HdfsModule()); - install(new HdfsAuthenticationModule()); - install(conditionalModule(RubixEnabledConfig.class, RubixEnabledConfig::isCacheEnabled, new RubixModule())); - install(new HiveCosModule()); - install(new HiveGcsModule()); + HdfsFileSystemLoader loader = new HdfsFileSystemLoader( + getProperties(), + !config.isNativeAzureEnabled(), + !config.isNativeGcsEnabled(), + !config.isNativeS3Enabled(), + catalogName, + nodeManager, + openTelemetry); + + loader.configure().forEach(this::consumeProperty); + binder.bind(HdfsFileSystemLoader.class).toInstance(loader); } var factories = newMapBinder(binder, String.class, TrinoFileSystemFactory.class); @@ -71,9 +79,6 @@ protected void setup(Binder binder) factories.addBinding("abfs").to(AzureFileSystemFactory.class); factories.addBinding("abfss").to(AzureFileSystemFactory.class); } - else if (config.isHadoopEnabled()) { - install(new HiveAzureModule()); - } if (config.isNativeS3Enabled()) { install(new S3FileSystemModule()); @@ -81,38 +86,25 @@ else if (config.isHadoopEnabled()) { factories.addBinding("s3a").to(S3FileSystemFactory.class); factories.addBinding("s3n").to(S3FileSystemFactory.class); } - else if (config.isHadoopEnabled()) { - install(new HiveS3Module()); - } if (config.isNativeGcsEnabled()) { install(new GcsFileSystemModule()); factories.addBinding("gs").to(GcsFileSystemFactory.class); } - else { - install(new HiveGcsModule()); - } } @Provides @Singleton public TrinoFileSystemFactory createFileSystemFactory( - HdfsFileSystemFactoryHolder hdfsFileSystemFactory, + Optional hdfsFileSystemLoader, + LifeCycleManager lifeCycleManager, Map factories, Tracer tracer) { - TrinoFileSystemFactory delegate = new SwitchingFileSystemFactory(hdfsFileSystemFactory.value(), factories); - return new TracingFileSystemFactory(tracer, delegate); - } + Optional hdfsFactory = hdfsFileSystemLoader.map(HdfsFileSystemLoader::create); + hdfsFactory.ifPresent(lifeCycleManager::addInstance); - public static class HdfsFileSystemFactoryHolder - { - @Inject(optional = true) - private HdfsFileSystemFactory hdfsFileSystemFactory; - - public Optional value() - { - return Optional.ofNullable(hdfsFileSystemFactory); - } + TrinoFileSystemFactory delegate = new SwitchingFileSystemFactory(hdfsFactory, factories); + return new TracingFileSystemFactory(tracer, delegate); } } diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/HdfsClassLoader.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/HdfsClassLoader.java new file mode 100644 index 000000000000..172e916ebc15 --- /dev/null +++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/HdfsClassLoader.java @@ -0,0 +1,123 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.manager; + +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Enumeration; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; + +// based on io.trino.server.PluginClassLoader +final class HdfsClassLoader + extends URLClassLoader +{ + public HdfsClassLoader(List urls) + { + // This class loader should not have access to the system (application) class loader + super(urls.toArray(URL[]::new), getPlatformClassLoader()); + } + + @Override + protected Class loadClass(String name, boolean resolve) + throws ClassNotFoundException + { + synchronized (getClassLoadingLock(name)) { + // Check if class is in the loaded classes cache + Class cachedClass = findLoadedClass(name); + if (cachedClass != null) { + return resolveClass(cachedClass, resolve); + } + + // If this is an override class, only check override class loader + if (isOverrideClass(name)) { + return resolveClass(overrideClassLoader().loadClass(name), resolve); + } + + // Look for class locally + return super.loadClass(name, resolve); + } + } + + private Class resolveClass(Class clazz, boolean resolve) + { + if (resolve) { + resolveClass(clazz); + } + return clazz; + } + + @Override + public URL getResource(String name) + { + // If this is an override resource, only check override class loader + if (isOverrideResource(name)) { + return overrideClassLoader().getResource(name); + } + + // Look for resource locally + return super.getResource(name); + } + + @Override + public Enumeration getResources(String name) + throws IOException + { + // If this is an override resource, use override resources + if (isOverrideResource(name)) { + return overrideClassLoader().getResources(name); + } + + // Use local resources + return super.getResources(name); + } + + private ClassLoader overrideClassLoader() + { + return getClass().getClassLoader(); + } + + private static boolean isOverrideResource(String name) + { + return isOverrideClass(name.replace('.', '/')); + } + + private static boolean isOverrideClass(String name) + { + // SPI packages from io.trino.server.PluginManager and dependencies of trino-filesystem + return hasPackage(name, "io.trino.spi.") || + hasPackage(name, "com.fasterxml.jackson.annotation.") || + hasPackage(name, "io.airlift.slice.") || + hasPackage(name, "org.openjdk.jol.") || + hasPackage(name, "io.opentelemetry.api.") || + hasPackage(name, "io.opentelemetry.context.") || + hasPackage(name, "com.google.common.") || + hasExactPackage(name, "io.trino.memory.context.") || + hasExactPackage(name, "io.trino.filesystem."); + } + + private static boolean hasPackage(String name, String packageName) + { + checkArgument(!packageName.isEmpty() && packageName.charAt(packageName.length() - 1) == '.'); + return name.startsWith(packageName); + } + + private static boolean hasExactPackage(String name, String packageName) + { + checkArgument(!packageName.isEmpty() && packageName.charAt(packageName.length() - 1) == '.'); + return name.startsWith(packageName) && (name.lastIndexOf('.') == (packageName.length() - 1)); + } +} diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/HdfsFileSystemLoader.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/HdfsFileSystemLoader.java new file mode 100644 index 000000000000..c3e0785ac0f4 --- /dev/null +++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/HdfsFileSystemLoader.java @@ -0,0 +1,177 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.manager; + +import io.opentelemetry.api.OpenTelemetry; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.spi.NodeManager; +import io.trino.spi.Plugin; +import io.trino.spi.classloader.ThreadContextClassLoader; +import jakarta.annotation.PreDestroy; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.DirectoryStream; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static com.google.common.base.Verify.verify; +import static com.google.common.collect.Streams.stream; +import static java.nio.file.Files.newDirectoryStream; + +final class HdfsFileSystemLoader +{ + private final HdfsClassLoader classLoader; + private final Object manager; + + public HdfsFileSystemLoader( + Map config, + boolean azureEnabled, + boolean gcsEnabled, + boolean s3Enabled, + String catalogName, + NodeManager nodeManager, + OpenTelemetry openTelemetry) + { + Class clazz = tryLoadExistingHdfsManager(); + + // check if we are running inside a plugin class loader (full server mode) + if (!getClass().getClassLoader().equals(Plugin.class.getClassLoader())) { + verify(clazz == null, "HDFS should not be on the plugin classpath"); + File sourceFile = getCurrentClassLocation(); + File directory; + if (sourceFile.isDirectory()) { + // running DevelopmentServer in the IDE + verify(sourceFile.getPath().endsWith("/target/classes"), "Source file not in 'target' directory: %s", sourceFile); + directory = new File(sourceFile.getParentFile().getParentFile().getParentFile(), "trino-hdfs/target/hdfs"); + } + else { + // normal server mode where HDFS JARs are in a subdirectory of the plugin + directory = new File(sourceFile.getParentFile(), "hdfs"); + } + verify(directory.isDirectory(), "HDFS directory is missing: %s", directory); + classLoader = createClassLoader(directory); + clazz = loadHdfsManager(classLoader); + } + else { + verify(clazz != null, "HDFS should be on the classpath for tests"); + classLoader = null; + } + + try (var ignored = new ThreadContextClassLoader(classLoader)) { + manager = clazz.getConstructor(Map.class, boolean.class, boolean.class, boolean.class, String.class, NodeManager.class, OpenTelemetry.class) + .newInstance(config, azureEnabled, gcsEnabled, s3Enabled, catalogName, nodeManager, openTelemetry); + } + catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + + @SuppressWarnings("unchecked") + public Set configure() + { + try (var ignored = new ThreadContextClassLoader(classLoader)) { + return (Set) manager.getClass().getMethod("configure").invoke(manager); + } + catch (ReflectiveOperationException e) { + throw new RuntimeException("Failed to configure HDFS:\n%s\n%s\n%s".formatted("<".repeat(70), e.getCause(), ">".repeat(70)), e); + } + } + + public TrinoFileSystemFactory create() + { + try (var ignored = new ThreadContextClassLoader(classLoader)) { + return (TrinoFileSystemFactory) manager.getClass().getMethod("create").invoke(manager); + } + catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + + @PreDestroy + public void stop() + throws IOException, ReflectiveOperationException + { + try (classLoader; var ignored = new ThreadContextClassLoader(classLoader)) { + manager.getClass().getMethod("stop").invoke(manager); + } + } + + private File getCurrentClassLocation() + { + try { + return new File(getClass().getProtectionDomain().getCodeSource().getLocation().toURI()); + } + catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + private Class tryLoadExistingHdfsManager() + { + try { + return loadHdfsManager(getClass().getClassLoader()); + } + catch (RuntimeException e) { + return null; + } + } + + private static Class loadHdfsManager(ClassLoader classLoader) + { + try { + return classLoader.loadClass("io.trino.filesystem.hdfs.HdfsFileSystemManager"); + } + catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + private static HdfsClassLoader createClassLoader(File path) + { + List urls = buildClassPath(path); + verify(!urls.isEmpty(), "HDFS directory is empty: %s", path); + return new HdfsClassLoader(urls); + } + + private static List buildClassPath(File path) + { + try (DirectoryStream directoryStream = newDirectoryStream(path.toPath())) { + return stream(directoryStream) + .map(Path::toFile) + .sorted().toList().stream() + .map(HdfsFileSystemLoader::fileToUrl) + .toList(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static URL fileToUrl(File file) + { + try { + return file.toURI().toURL(); + } + catch (MalformedURLException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/lib/trino-hdfs/pom.xml b/lib/trino-hdfs/pom.xml index e2efb802453c..813b9c6f44f2 100644 --- a/lib/trino-hdfs/pom.xml +++ b/lib/trino-hdfs/pom.xml @@ -76,6 +76,11 @@ failsafe + + io.airlift + bootstrap + + io.airlift concurrent @@ -283,6 +288,21 @@ + + + ca.vanzyl.provisio.maven.plugins + provisio-maven-plugin + + + + provision + + + ${project.build.directory}/hdfs + + + + diff --git a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystemManager.java b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystemManager.java new file mode 100644 index 000000000000..19c89f815213 --- /dev/null +++ b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystemManager.java @@ -0,0 +1,110 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.hdfs; + +import com.google.inject.Injector; +import com.google.inject.Module; +import io.airlift.bootstrap.Bootstrap; +import io.airlift.bootstrap.LifeCycleManager; +import io.opentelemetry.api.OpenTelemetry; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.hdfs.HdfsModule; +import io.trino.hdfs.authentication.HdfsAuthenticationModule; +import io.trino.hdfs.azure.HiveAzureModule; +import io.trino.hdfs.cos.HiveCosModule; +import io.trino.hdfs.gcs.HiveGcsModule; +import io.trino.hdfs.rubix.RubixEnabledConfig; +import io.trino.hdfs.rubix.RubixModule; +import io.trino.hdfs.s3.HiveS3Module; +import io.trino.plugin.base.CatalogName; +import io.trino.plugin.base.jmx.ConnectorObjectNameGeneratorModule; +import io.trino.plugin.base.jmx.MBeanServerModule; +import io.trino.spi.NodeManager; +import org.weakref.jmx.guice.MBeanModule; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static io.airlift.configuration.ConditionalModule.conditionalModule; + +public final class HdfsFileSystemManager +{ + private final Bootstrap bootstrap; + private LifeCycleManager lifecycleManager; + + public HdfsFileSystemManager( + Map config, + boolean azureEnabled, + boolean gcsEnabled, + boolean s3Enabled, + String catalogName, + NodeManager nodeManager, + OpenTelemetry openTelemetry) + { + List modules = new ArrayList<>(); + + modules.add(new MBeanModule()); + modules.add(new MBeanServerModule()); + modules.add(new ConnectorObjectNameGeneratorModule("", "")); + + modules.add(new HdfsFileSystemModule()); + modules.add(new HdfsModule()); + modules.add(new HdfsAuthenticationModule()); + modules.add(new HiveCosModule()); + + modules.add(conditionalModule(RubixEnabledConfig.class, RubixEnabledConfig::isCacheEnabled, new RubixModule())); + + modules.add(binder -> { + binder.bind(NodeManager.class).toInstance(nodeManager); + binder.bind(OpenTelemetry.class).toInstance(openTelemetry); + binder.bind(CatalogName.class).toInstance(new CatalogName(catalogName)); + }); + + if (azureEnabled) { + modules.add(new HiveAzureModule()); + } + if (gcsEnabled) { + modules.add(new HiveGcsModule()); + } + if (s3Enabled) { + modules.add(new HiveS3Module()); + } + + bootstrap = new Bootstrap(modules) + .doNotInitializeLogging() + .setRequiredConfigurationProperties(Map.of()) + .setOptionalConfigurationProperties(config); + } + + public Set configure() + { + return bootstrap.configure(); + } + + public TrinoFileSystemFactory create() + { + Injector injector = bootstrap.initialize(); + lifecycleManager = injector.getInstance(LifeCycleManager.class); + return injector.getInstance(HdfsFileSystemFactory.class); + } + + public void stop() + { + if (lifecycleManager != null) { + lifecycleManager.stop(); + } + } +} diff --git a/lib/trino-hdfs/src/main/java/io/trino/hdfs/HdfsEnvironment.java b/lib/trino-hdfs/src/main/java/io/trino/hdfs/HdfsEnvironment.java index 7608e1f73e3f..7950a9b02fda 100644 --- a/lib/trino-hdfs/src/main/java/io/trino/hdfs/HdfsEnvironment.java +++ b/lib/trino-hdfs/src/main/java/io/trino/hdfs/HdfsEnvironment.java @@ -23,6 +23,7 @@ import io.trino.hdfs.authentication.HdfsAuthentication; import io.trino.hdfs.gcs.GcsStorageFactory; import io.trino.spi.Plugin; +import io.trino.spi.classloader.ThreadContextClassLoader; import io.trino.spi.security.ConnectorIdentity; import jakarta.annotation.PreDestroy; import org.apache.hadoop.conf.Configuration; @@ -82,7 +83,7 @@ public HdfsEnvironment( public void shutdown() throws IOException { - // shut down if running in a plugin classloader + // shut down if running in an isolated classloader if (!getClass().getClassLoader().equals(Plugin.class.getClassLoader())) { FileSystemFinalizerService.shutdown(); stopFileSystemStatsThread(); @@ -104,14 +105,16 @@ public FileSystem getFileSystem(HdfsContext context, Path path) public FileSystem getFileSystem(ConnectorIdentity identity, Path path, Configuration configuration) throws IOException { - return hdfsAuthentication.doAs(identity, () -> { - FileSystem fileSystem = path.getFileSystem(configuration); - fileSystem.setVerifyChecksum(verifyChecksum); - if (getRawFileSystem(fileSystem) instanceof OpenTelemetryAwareFileSystem fs) { - fs.setOpenTelemetry(openTelemetry); - } - return fileSystem; - }); + try (var ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { + return hdfsAuthentication.doAs(identity, () -> { + FileSystem fileSystem = path.getFileSystem(configuration); + fileSystem.setVerifyChecksum(verifyChecksum); + if (getRawFileSystem(fileSystem) instanceof OpenTelemetryAwareFileSystem fs) { + fs.setOpenTelemetry(openTelemetry); + } + return fileSystem; + }); + } } public Optional getNewDirectoryPermissions() @@ -127,7 +130,9 @@ public boolean isNewFileInheritOwnership() public R doAs(ConnectorIdentity identity, GenericExceptionAction action) throws E { - return hdfsAuthentication.doAs(identity, action); + try (var ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { + return hdfsAuthentication.doAs(identity, action); + } } public Storage createGcsStorage(HdfsContext context, Path path) diff --git a/lib/trino-hdfs/src/main/provisio/hdfs.xml b/lib/trino-hdfs/src/main/provisio/hdfs.xml new file mode 100644 index 000000000000..887dcfeacf6c --- /dev/null +++ b/lib/trino-hdfs/src/main/provisio/hdfs.xml @@ -0,0 +1,4 @@ + + + + diff --git a/lib/trino-hdfs/src/test/java/io/trino/filesystem/hdfs/TestHdfsFileSystemManager.java b/lib/trino-hdfs/src/test/java/io/trino/filesystem/hdfs/TestHdfsFileSystemManager.java new file mode 100644 index 000000000000..83dde99c7bed --- /dev/null +++ b/lib/trino-hdfs/src/test/java/io/trino/filesystem/hdfs/TestHdfsFileSystemManager.java @@ -0,0 +1,61 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.hdfs; + +import com.google.common.collect.ImmutableMap; +import io.opentelemetry.api.OpenTelemetry; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.spi.security.ConnectorIdentity; +import io.trino.testing.TestingNodeManager; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Set; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +class TestHdfsFileSystemManager +{ + @Test + void testManager() + throws IOException + { + HdfsFileSystemManager manager = new HdfsFileSystemManager( + ImmutableMap.builder() + .put("unused-property", "ignored") + .put("hive.dfs.verify-checksum", "false") + .put("hive.s3.region", "us-west-1") + .buildOrThrow(), + true, + true, + true, + "test", + new TestingNodeManager(), + OpenTelemetry.noop()); + + Set used = manager.configure(); + assertThat(used).containsExactly("hive.dfs.verify-checksum", "hive.s3.region"); + + TrinoFileSystemFactory factory = manager.create(); + TrinoFileSystem fileSystem = factory.create(ConnectorIdentity.ofUser("test")); + + Location location = Location.of("/tmp/" + UUID.randomUUID()); + assertThat(fileSystem.newInputFile(location).exists()).isFalse(); + + manager.stop(); + } +} diff --git a/lib/trino-orc/pom.xml b/lib/trino-orc/pom.xml index 6208e2e87e84..04f9293efcbf 100644 --- a/lib/trino-orc/pom.xml +++ b/lib/trino-orc/pom.xml @@ -114,12 +114,6 @@ runtime - - io.trino.hadoop - hadoop-apache - runtime - - io.airlift junit-extensions @@ -150,6 +144,12 @@ test + + io.trino.hadoop + hadoop-apache + test + + io.trino.hive hive-apache diff --git a/plugin/trino-delta-lake/pom.xml b/plugin/trino-delta-lake/pom.xml index 19ba1084696a..932f6a96b39e 100644 --- a/plugin/trino-delta-lake/pom.xml +++ b/plugin/trino-delta-lake/pom.xml @@ -258,12 +258,6 @@ runtime - - io.trino - trino-hdfs - runtime - - io.trino trino-memory-context @@ -271,8 +265,8 @@ - io.trino.hadoop - hadoop-apache + org.jetbrains + annotations runtime @@ -314,6 +308,12 @@ test + + io.trino + trino-hdfs + test + + io.trino trino-hive @@ -396,20 +396,20 @@ - io.trino.tpch - tpch + io.trino.hadoop + hadoop-apache test - org.assertj - assertj-core + io.trino.tpch + tpch test - org.jetbrains - annotations + org.assertj + assertj-core test diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConnectorFactory.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConnectorFactory.java index 01140d080070..df3cc06f5b77 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConnectorFactory.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConnectorFactory.java @@ -18,12 +18,12 @@ import io.trino.spi.connector.ConnectorContext; import io.trino.spi.connector.ConnectorFactory; -import java.lang.reflect.InvocationTargetException; import java.util.Map; import java.util.Optional; -import static com.google.common.base.Throwables.throwIfUnchecked; +import static com.google.inject.util.Modules.EMPTY_MODULE; import static io.trino.plugin.base.Versions.checkStrictSpiVersionMatch; +import static io.trino.plugin.deltalake.InternalDeltaLakeConnectorFactory.createConnector; import static java.util.Objects.requireNonNull; public class DeltaLakeConnectorFactory @@ -31,9 +31,14 @@ public class DeltaLakeConnectorFactory { public static final String CONNECTOR_NAME = "delta_lake"; - private final Class module; + private final Module module; - public DeltaLakeConnectorFactory(Class module) + public DeltaLakeConnectorFactory() + { + this(EMPTY_MODULE); + } + + public DeltaLakeConnectorFactory(Module module) { this.module = requireNonNull(module, "module is null"); } @@ -48,22 +53,6 @@ public String getName() public Connector create(String catalogName, Map config, ConnectorContext context) { checkStrictSpiVersionMatch(context, this); - - ClassLoader classLoader = context.duplicatePluginClassLoader(); - try { - Class moduleClass = classLoader.loadClass(Module.class.getName()); - Object moduleInstance = classLoader.loadClass(module.getName()).getConstructor().newInstance(); - return (Connector) classLoader.loadClass(InternalDeltaLakeConnectorFactory.class.getName()) - .getMethod("createConnector", String.class, Map.class, ConnectorContext.class, Optional.class, Optional.class, moduleClass) - .invoke(null, catalogName, config, context, Optional.empty(), Optional.empty(), moduleInstance); - } - catch (InvocationTargetException e) { - Throwable targetException = e.getTargetException(); - throwIfUnchecked(targetException); - throw new RuntimeException(targetException); - } - catch (ReflectiveOperationException e) { - throw new RuntimeException(e); - } + return createConnector(catalogName, config, context, Optional.empty(), Optional.empty(), module); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePlugin.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePlugin.java index f4263e59dd80..f4b44d0e95de 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePlugin.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePlugin.java @@ -14,8 +14,6 @@ package io.trino.plugin.deltalake; import com.google.common.collect.ImmutableList; -import com.google.inject.Module; -import io.trino.plugin.hive.HiveConnectorFactory.EmptyModule; import io.trino.spi.Plugin; import io.trino.spi.connector.ConnectorFactory; @@ -25,11 +23,6 @@ public class DeltaLakePlugin @Override public Iterable getConnectorFactories() { - return ImmutableList.of(getConnectorFactory(EmptyModule.class)); - } - - public ConnectorFactory getConnectorFactory(Class module) - { - return new DeltaLakeConnectorFactory(module); + return ImmutableList.of(new DeltaLakeConnectorFactory()); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java index b73f58111914..7c27b8d151cd 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java @@ -73,7 +73,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.function.UnaryOperator.identity; -public class DeltaLakeWriter +public final class DeltaLakeWriter implements FileWriter { private final ParquetFileWriter fileWriter; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/InternalDeltaLakeConnectorFactory.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/InternalDeltaLakeConnectorFactory.java index a9379152401b..2f39dbbd64ab 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/InternalDeltaLakeConnectorFactory.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/InternalDeltaLakeConnectorFactory.java @@ -91,7 +91,7 @@ public static Connector createConnector( new DeltaLakeSynchronizerModule(), fileSystemFactory .map(factory -> (Module) binder -> binder.bind(TrinoFileSystemFactory.class).toInstance(factory)) - .orElseGet(FileSystemModule::new), + .orElseGet(() -> new FileSystemModule(catalogName, context.getNodeManager(), context.getOpenTelemetry())), binder -> { binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry()); binder.bind(Tracer.class).toInstance(context.getTracer()); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeGlueMetastore.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeGlueMetastore.java index c06f4a1cb959..54fa8be4597b 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeGlueMetastore.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeGlueMetastore.java @@ -113,11 +113,11 @@ public void setUp() .put("delta.hide-non-delta-lake-tables", "true") .buildOrThrow(); + ConnectorContext context = new TestingConnectorContext(); Bootstrap app = new Bootstrap( // connector dependencies new JsonModule(), binder -> { - ConnectorContext context = new TestingConnectorContext(); binder.bind(CatalogName.class).toInstance(new CatalogName("test")); binder.bind(TypeManager.class).toInstance(context.getTypeManager()); binder.bind(NodeManager.class).toInstance(context.getNodeManager()); @@ -130,7 +130,7 @@ public void setUp() new DeltaLakeMetastoreModule(), new DeltaLakeModule(), // test setup - new FileSystemModule()); + new FileSystemModule("test", context.getNodeManager(), context.getOpenTelemetry())); Injector injector = app .doNotInitializeLogging() diff --git a/plugin/trino-geospatial/pom.xml b/plugin/trino-geospatial/pom.xml index 1d0d11382103..61fa6c3bdf2d 100644 --- a/plugin/trino-geospatial/pom.xml +++ b/plugin/trino-geospatial/pom.xml @@ -114,6 +114,12 @@ test + + io.trino + trino-hdfs + test + + io.trino trino-hive diff --git a/plugin/trino-hive-hadoop2/pom.xml b/plugin/trino-hive-hadoop2/pom.xml index e18ca769a954..02a3f5437a36 100644 --- a/plugin/trino-hive-hadoop2/pom.xml +++ b/plugin/trino-hive-hadoop2/pom.xml @@ -112,24 +112,12 @@ runtime - - io.trino - trino-hdfs - runtime - - io.trino trino-plugin-toolkit runtime - - io.trino.hadoop - hadoop-apache - runtime - - org.alluxio alluxio-shaded-client @@ -148,6 +136,12 @@ test + + io.trino + trino-hdfs + test + + io.trino trino-hive @@ -186,6 +180,12 @@ test + + io.trino.hadoop + hadoop-apache + test + + io.trino.hive hive-apache diff --git a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHivePlugin.java b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHivePlugin.java index 6957c6d2a309..b1dc799428d0 100644 --- a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHivePlugin.java +++ b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHivePlugin.java @@ -307,7 +307,7 @@ public void testRubixCacheWithNonExistingCacheDirectory() .put("bootstrap.quiet", "true") .buildOrThrow(), new TestingConnectorContext())) - .hasRootCauseMessage("None of the cache parent directories exists"); + .hasMessageContaining("None of the cache parent directories exists"); assertThatThrownBy(() -> connectorFactory.create( "test", @@ -318,7 +318,7 @@ public void testRubixCacheWithNonExistingCacheDirectory() .put("bootstrap.quiet", "true") .buildOrThrow(), new TestingConnectorContext())) - .hasRootCauseMessage("caching directories were not provided"); + .hasMessageContaining("caching directories were not provided"); // cache directories should not be required when cache is not explicitly started on coordinator connectorFactory.create( diff --git a/plugin/trino-hive/pom.xml b/plugin/trino-hive/pom.xml index c91a4f9ab119..d2bd5b9582ee 100644 --- a/plugin/trino-hive/pom.xml +++ b/plugin/trino-hive/pom.xml @@ -294,18 +294,6 @@ runtime - - io.trino - trino-hdfs - runtime - - - - io.trino.hadoop - hadoop-apache - runtime - - jakarta.xml.bind jakarta.xml.bind-api @@ -385,6 +373,12 @@ test + + io.trino + trino-hdfs + test + + io.trino trino-main @@ -441,6 +435,12 @@ test + + io.trino.hadoop + hadoop-apache + test + + io.trino.hive hive-apache diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConnectorFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConnectorFactory.java index f7e50db00269..d2a861c28e04 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConnectorFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConnectorFactory.java @@ -13,30 +13,29 @@ */ package io.trino.plugin.hive; -import com.google.inject.Binder; import com.google.inject.Module; import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorContext; import io.trino.spi.connector.ConnectorFactory; -import java.lang.reflect.InvocationTargetException; import java.util.Map; -import static com.google.common.base.Throwables.throwIfUnchecked; +import static com.google.inject.util.Modules.EMPTY_MODULE; import static io.trino.plugin.base.Versions.checkStrictSpiVersionMatch; +import static io.trino.plugin.hive.InternalHiveConnectorFactory.createConnector; import static java.util.Objects.requireNonNull; public class HiveConnectorFactory implements ConnectorFactory { - private final Class module; + private final Module module; public HiveConnectorFactory() { - this(EmptyModule.class); + this(EMPTY_MODULE); } - public HiveConnectorFactory(Class module) + public HiveConnectorFactory(Module module) { this.module = requireNonNull(module, "module is null"); } @@ -51,29 +50,6 @@ public String getName() public Connector create(String catalogName, Map config, ConnectorContext context) { checkStrictSpiVersionMatch(context, this); - - ClassLoader classLoader = context.duplicatePluginClassLoader(); - try { - Object moduleInstance = classLoader.loadClass(module.getName()).getConstructor().newInstance(); - Class moduleClass = classLoader.loadClass(Module.class.getName()); - return (Connector) classLoader.loadClass(InternalHiveConnectorFactory.class.getName()) - .getMethod("createConnector", String.class, Map.class, ConnectorContext.class, moduleClass) - .invoke(null, catalogName, config, context, moduleInstance); - } - catch (InvocationTargetException e) { - Throwable targetException = e.getTargetException(); - throwIfUnchecked(targetException); - throw new RuntimeException(targetException); - } - catch (ReflectiveOperationException e) { - throw new RuntimeException(e); - } - } - - public static class EmptyModule - implements Module - { - @Override - public void configure(Binder binder) {} + return createConnector(catalogName, config, context, module); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java index 4f5c0c2c656a..69a5fe1089d1 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java @@ -110,7 +110,7 @@ public static Connector createConnector( new HiveSecurityModule(), fileSystemFactory .map(factory -> (Module) binder -> binder.bind(TrinoFileSystemFactory.class).toInstance(factory)) - .orElseGet(FileSystemModule::new), + .orElseGet(() -> new FileSystemModule(catalogName, context.getNodeManager(), openTelemetry.orElse(context.getOpenTelemetry()))), new HiveProcedureModule(), new MBeanServerModule(), binder -> { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java index 48d39436a608..0a27cb663f75 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java @@ -58,7 +58,7 @@ import static io.trino.spi.type.IntegerType.INTEGER; import static java.util.Objects.requireNonNull; -public class MergeFileWriter +public final class MergeFileWriter implements FileWriter { // The bucketPath looks like this: /root/dir/delta_nnnnnnn_mmmmmmm_ssss/bucket_bbbbb(_aaaa)? diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java index f881ae81d5f2..d840f99f9c8f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java @@ -45,7 +45,7 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITE_VALIDATION_FAILED; import static java.util.Objects.requireNonNull; -public class RcFileFileWriter +public final class RcFileFileWriter implements FileWriter { private static final int INSTANCE_SIZE = instanceSize(RcFileFileWriter.class); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java index 59b9ee0e096d..61fbcec9a9c4 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java @@ -59,7 +59,7 @@ import static java.util.Comparator.comparing; import static java.util.Objects.requireNonNull; -public class SortingFileWriter +public final class SortingFileWriter implements FileWriter { private static final Logger log = Logger.get(SortingFileWriter.class); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHiveFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHiveFileWriter.java index 6afed9a0631c..f520c368e787 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHiveFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHiveFileWriter.java @@ -46,7 +46,7 @@ import static io.trino.plugin.hive.avro.AvroHiveFileUtils.getCanonicalToGivenFieldName; import static java.util.Objects.requireNonNull; -public class AvroHiveFileWriter +public final class AvroHiveFileWriter implements FileWriter { private static final int INSTANCE_SIZE = instanceSize(AvroHiveFileWriter.class); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LineFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LineFileWriter.java index cd0ca15d0528..49dc21e96ed6 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LineFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LineFileWriter.java @@ -34,7 +34,7 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; import static java.util.Objects.requireNonNull; -public class LineFileWriter +public final class LineFileWriter implements FileWriter { private static final int INSTANCE_SIZE = instanceSize(LineFileWriter.class); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java index 553ce8255882..e0d6776cb821 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java @@ -62,7 +62,7 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; -public class OrcFileWriter +public final class OrcFileWriter implements FileWriter { private static final Logger log = Logger.get(OrcFileWriter.class); @@ -182,7 +182,7 @@ public Closeable commit() { try { if (transaction.isAcidTransactionRunning() && useAcidSchema) { - updateUserMetadata(); + updateAcidUserMetadata(); } orcWriter.close(); } @@ -213,28 +213,26 @@ public Closeable commit() return rollbackAction; } - private void updateUserMetadata() + private void updateAcidUserMetadata() { int bucketValue = computeBucketValue(bucketNumber.orElse(0), 0); long writeId = maxWriteId.isPresent() ? maxWriteId.getAsLong() : transaction.getWriteId(); - if (transaction.isAcidTransactionRunning()) { - int stripeRowCount = orcWriter.getStripeRowCount(); - Map userMetadata = new HashMap<>(); - switch (writerKind) { - case INSERT: - userMetadata.put("hive.acid.stats", format("%s,0,0", stripeRowCount)); - break; - case DELETE: - userMetadata.put("hive.acid.stats", format("0,0,%s", stripeRowCount)); - break; - default: - throw new IllegalStateException("In updateUserMetadata, unknown writerKind " + writerKind); - } - userMetadata.put("hive.acid.key.index", format("%s,%s,%s;", writeId, bucketValue, stripeRowCount - 1)); - userMetadata.put("hive.acid.version", "2"); - - orcWriter.updateUserMetadata(userMetadata); + int stripeRowCount = orcWriter.getStripeRowCount(); + Map userMetadata = new HashMap<>(); + switch (writerKind) { + case INSERT: + userMetadata.put("hive.acid.stats", format("%s,0,0", stripeRowCount)); + break; + case DELETE: + userMetadata.put("hive.acid.stats", format("0,0,%s", stripeRowCount)); + break; + default: + throw new IllegalStateException("In updateUserMetadata, unknown writerKind " + writerKind); } + userMetadata.put("hive.acid.key.index", format("%s,%s,%s;", writeId, bucketValue, stripeRowCount - 1)); + userMetadata.put("hive.acid.version", "2"); + + orcWriter.updateUserMetadata(userMetadata); } @Override diff --git a/plugin/trino-hudi/pom.xml b/plugin/trino-hudi/pom.xml index 8b7b8545a182..81f18cd509dd 100644 --- a/plugin/trino-hudi/pom.xml +++ b/plugin/trino-hudi/pom.xml @@ -183,14 +183,8 @@ - io.trino - trino-hdfs - runtime - - - - io.trino.hadoop - hadoop-apache + org.jetbrains + annotations runtime @@ -206,6 +200,12 @@ test + + io.trino + trino-hdfs + test + + io.trino trino-hive @@ -263,6 +263,12 @@ test + + io.trino.hadoop + hadoop-apache + test + + io.trino.tpch tpch @@ -320,12 +326,6 @@ test - - org.jetbrains - annotations - test - - org.junit.jupiter junit-jupiter-api diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java index 98d1a49cc3e2..21c7a6c03e3e 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java @@ -17,19 +17,15 @@ import io.trino.spi.connector.ConnectorContext; import io.trino.spi.connector.ConnectorFactory; -import java.lang.reflect.InvocationTargetException; import java.util.Map; import java.util.Optional; -import static com.google.common.base.Throwables.throwIfUnchecked; import static io.trino.plugin.base.Versions.checkStrictSpiVersionMatch; +import static io.trino.plugin.hudi.InternalHudiConnectorFactory.createConnector; public class HudiConnectorFactory implements ConnectorFactory { - public HudiConnectorFactory() - {} - @Override public String getName() { @@ -40,20 +36,6 @@ public String getName() public Connector create(String catalogName, Map config, ConnectorContext context) { checkStrictSpiVersionMatch(context, this); - - ClassLoader classLoader = context.duplicatePluginClassLoader(); - try { - return (Connector) classLoader.loadClass(InternalHudiConnectorFactory.class.getName()) - .getMethod("createConnector", String.class, Map.class, ConnectorContext.class, Optional.class, Optional.class) - .invoke(null, catalogName, config, context, Optional.empty(), Optional.empty()); - } - catch (InvocationTargetException e) { - Throwable targetException = e.getTargetException(); - throwIfUnchecked(targetException); - throw new RuntimeException(targetException); - } - catch (ReflectiveOperationException e) { - throw new RuntimeException(e); - } + return createConnector(catalogName, config, context, Optional.empty(), Optional.empty()); } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/InternalHudiConnectorFactory.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/InternalHudiConnectorFactory.java index a65fda489440..9a09752ce57f 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/InternalHudiConnectorFactory.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/InternalHudiConnectorFactory.java @@ -70,7 +70,7 @@ public static Connector createConnector( new HiveMetastoreModule(metastore), fileSystemFactory .map(factory -> (Module) binder -> binder.bind(TrinoFileSystemFactory.class).toInstance(factory)) - .orElseGet(FileSystemModule::new), + .orElseGet(() -> new FileSystemModule(catalogName, context.getNodeManager(), context.getOpenTelemetry())), new MBeanServerModule(), binder -> { binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry()); diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 32e0f0225426..cb388f332011 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -215,11 +215,6 @@ ${dep.iceberg.version} - - org.apache.iceberg - iceberg-orc - - org.apache.iceberg iceberg-parquet @@ -344,18 +339,6 @@ runtime - - io.trino - trino-hdfs - runtime - - - - io.trino.hadoop - hadoop-apache - runtime - - org.apache.httpcomponents.client5 httpclient5 @@ -467,6 +450,12 @@ test + + io.trino + trino-hdfs + test + + io.trino trino-hive @@ -524,6 +513,12 @@ test + + io.trino.hadoop + hadoop-apache + test + + io.trino.hive hive-apache diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java index f0d54a25fd0f..e070f7557a37 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java @@ -38,7 +38,7 @@ import static java.util.Objects.requireNonNull; import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; -public class IcebergAvroFileWriter +public final class IcebergAvroFileWriter implements IcebergFileWriter { private static final int INSTANCE_SIZE = instanceSize(IcebergAvroFileWriter.class); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnectorFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnectorFactory.java index 39e2c9e52617..22aa76c52b52 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnectorFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnectorFactory.java @@ -13,31 +13,30 @@ */ package io.trino.plugin.iceberg; -import com.google.inject.Binder; import com.google.inject.Module; import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorContext; import io.trino.spi.connector.ConnectorFactory; -import java.lang.reflect.InvocationTargetException; import java.util.Map; import java.util.Optional; -import static com.google.common.base.Throwables.throwIfUnchecked; +import static com.google.inject.util.Modules.EMPTY_MODULE; import static io.trino.plugin.base.Versions.checkStrictSpiVersionMatch; +import static io.trino.plugin.iceberg.InternalIcebergConnectorFactory.createConnector; import static java.util.Objects.requireNonNull; public class IcebergConnectorFactory implements ConnectorFactory { - private final Class module; + private final Module module; public IcebergConnectorFactory() { - this(EmptyModule.class); + this(EMPTY_MODULE); } - public IcebergConnectorFactory(Class module) + public IcebergConnectorFactory(Module module) { this.module = requireNonNull(module, "module is null"); } @@ -52,29 +51,6 @@ public String getName() public Connector create(String catalogName, Map config, ConnectorContext context) { checkStrictSpiVersionMatch(context, this); - - ClassLoader classLoader = context.duplicatePluginClassLoader(); - try { - Object moduleInstance = classLoader.loadClass(module.getName()).getConstructor().newInstance(); - Class moduleClass = classLoader.loadClass(Module.class.getName()); - return (Connector) classLoader.loadClass(InternalIcebergConnectorFactory.class.getName()) - .getMethod("createConnector", String.class, Map.class, ConnectorContext.class, moduleClass, Optional.class, Optional.class) - .invoke(null, catalogName, config, context, moduleInstance, Optional.empty(), Optional.empty()); - } - catch (InvocationTargetException e) { - Throwable targetException = e.getTargetException(); - throwIfUnchecked(targetException); - throw new RuntimeException(targetException); - } - catch (ReflectiveOperationException e) { - throw new RuntimeException(e); - } - } - - public static class EmptyModule - implements Module - { - @Override - public void configure(Binder binder) {} + return createConnector(catalogName, config, context, module, Optional.empty(), Optional.empty()); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergErrorCode.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergErrorCode.java index 0ce831abb7e7..6822462387b3 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergErrorCode.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergErrorCode.java @@ -40,6 +40,7 @@ public enum IcebergErrorCode ICEBERG_CATALOG_ERROR(13, EXTERNAL), ICEBERG_WRITER_CLOSE_ERROR(14, EXTERNAL), ICEBERG_MISSING_METADATA(15, EXTERNAL), + ICEBERG_WRITER_DATA_ERROR(16, EXTERNAL), /**/; private final ErrorCode errorCode; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java index b80236f45c9c..8da4680c3747 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java @@ -71,8 +71,8 @@ import static io.trino.plugin.iceberg.IcebergSessionProperties.getParquetWriterPageSize; import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcWriterValidate; import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP; -import static io.trino.plugin.iceberg.TypeConverter.toOrcType; import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; +import static io.trino.plugin.iceberg.util.OrcTypeConverter.toOrcType; import static io.trino.plugin.iceberg.util.PrimitiveTypeMapBuilder.makeTypeMap; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.lang.Double.parseDouble; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 05b010d30e0e..e90aeb4f72bf 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -288,7 +288,6 @@ import static java.util.function.Function.identity; import static java.util.stream.Collectors.joining; import static org.apache.iceberg.ReachableFileUtil.metadataFileLocations; -import static org.apache.iceberg.ReachableFileUtil.versionHintLocation; import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP; import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP; import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP; @@ -1651,7 +1650,8 @@ private void removeOrphanFiles(Table table, ConnectorSession session, SchemaTabl metadataFileLocations(table, false).stream() .map(IcebergUtil::fileName) .forEach(validMetadataFileNames::add); - validMetadataFileNames.add(fileName(versionHintLocation(table))); + + validMetadataFileNames.add("version-hint.text"); scanAndDeleteInvalidFiles(table, session, schemaTableName, expiration, validDataFileNames.build(), "data"); scanAndDeleteInvalidFiles(table, session, schemaTableName, expiration, validMetadataFileNames.build(), "metadata"); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java index 6eb0c19f289b..043769e58be6 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java @@ -13,65 +13,60 @@ */ package io.trino.plugin.iceberg; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import io.airlift.slice.Slice; +import io.airlift.log.Logger; import io.trino.orc.OrcDataSink; import io.trino.orc.OrcDataSource; -import io.trino.orc.OrcWriteValidation; +import io.trino.orc.OrcWriteValidation.OrcWriteValidationMode; +import io.trino.orc.OrcWriter; import io.trino.orc.OrcWriterOptions; import io.trino.orc.OrcWriterStats; import io.trino.orc.metadata.ColumnMetadata; import io.trino.orc.metadata.CompressionKind; -import io.trino.orc.metadata.OrcColumnId; import io.trino.orc.metadata.OrcType; -import io.trino.orc.metadata.statistics.BooleanStatistics; -import io.trino.orc.metadata.statistics.ColumnStatistics; -import io.trino.orc.metadata.statistics.DateStatistics; -import io.trino.orc.metadata.statistics.DecimalStatistics; -import io.trino.orc.metadata.statistics.DoubleStatistics; -import io.trino.orc.metadata.statistics.IntegerStatistics; -import io.trino.orc.metadata.statistics.StringStatistics; -import io.trino.orc.metadata.statistics.TimestampStatistics; -import io.trino.plugin.hive.WriterKind; -import io.trino.plugin.hive.orc.OrcFileWriter; +import io.trino.spi.Page; +import io.trino.spi.TrinoException; +import io.trino.spi.block.Block; +import io.trino.spi.block.RunLengthEncodedBlock; import io.trino.spi.type.Type; import org.apache.iceberg.Metrics; import org.apache.iceberg.MetricsConfig; -import org.apache.iceberg.MetricsModes; -import org.apache.iceberg.MetricsUtil; import org.apache.iceberg.Schema; -import org.apache.iceberg.expressions.Literal; -import org.apache.iceberg.types.Conversions; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.BinaryUtil; -import org.apache.iceberg.util.UnicodeUtil; import java.io.Closeable; -import java.math.BigDecimal; -import java.nio.ByteBuffer; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.OptionalInt; -import java.util.Set; import java.util.function.Supplier; -import static com.google.common.base.Verify.verify; -import static io.trino.orc.metadata.OrcColumnId.ROOT_COLUMN; -import static io.trino.plugin.hive.acid.AcidTransaction.NO_ACID_TRANSACTION; -import static io.trino.plugin.iceberg.TypeConverter.ORC_ICEBERG_ID_KEY; -import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND; -import static java.lang.Math.toIntExact; +import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.airlift.slice.SizeOf.instanceSize; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_CLOSE_ERROR; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_DATA_ERROR; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITE_VALIDATION_FAILED; +import static io.trino.plugin.iceberg.util.OrcMetrics.computeMetrics; import static java.util.Objects.requireNonNull; -public class IcebergOrcFileWriter - extends OrcFileWriter +public final class IcebergOrcFileWriter implements IcebergFileWriter { + private static final Logger log = Logger.get(IcebergOrcFileWriter.class); + private static final int INSTANCE_SIZE = instanceSize(IcebergOrcFileWriter.class); + private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean(); + + private final OrcWriter orcWriter; private final Schema icebergSchema; private final ColumnMetadata orcColumns; private final MetricsConfig metricsConfig; + private final Closeable rollbackAction; + private final int[] fileInputColumnIndexes; + private final List nullBlocks; + private final Optional> validationInputFactory; + private long validationCpuNanos; public IcebergOrcFileWriter( MetricsConfig metricsConfig, @@ -86,10 +81,29 @@ public IcebergOrcFileWriter( int[] fileInputColumnIndexes, Map metadata, Optional> validationInputFactory, - OrcWriteValidation.OrcWriteValidationMode validationMode, + OrcWriteValidationMode validationMode, OrcWriterStats stats) { - super(orcDataSink, WriterKind.INSERT, NO_ACID_TRANSACTION, false, OptionalInt.empty(), rollbackAction, columnNames, fileColumnTypes, fileColumnOrcTypes, compression, options, fileInputColumnIndexes, metadata, validationInputFactory, validationMode, stats); + requireNonNull(orcDataSink, "orcDataSink is null"); + this.rollbackAction = requireNonNull(rollbackAction, "rollbackAction is null"); + this.fileInputColumnIndexes = requireNonNull(fileInputColumnIndexes, "fileInputColumnIndexes is null"); + + this.nullBlocks = fileColumnTypes.stream() + .map(type -> type.createBlockBuilder(null, 1, 0).appendNull().build()) + .collect(toImmutableList()); + + this.validationInputFactory = validationInputFactory; + this.orcWriter = new OrcWriter( + orcDataSink, + columnNames, + fileColumnTypes, + fileColumnOrcTypes, + compression, + options, + metadata, + validationInputFactory.isPresent(), + validationMode, + stats); this.icebergSchema = requireNonNull(icebergSchema, "icebergSchema is null"); this.metricsConfig = requireNonNull(metricsConfig, "metricsConfig is null"); orcColumns = fileColumnOrcTypes; @@ -101,224 +115,98 @@ public Metrics getMetrics() return computeMetrics(metricsConfig, icebergSchema, orcColumns, orcWriter.getFileRowCount(), orcWriter.getFileStats()); } - private static Metrics computeMetrics(MetricsConfig metricsConfig, Schema icebergSchema, ColumnMetadata orcColumns, long fileRowCount, Optional> columnStatistics) + @Override + public long getWrittenBytes() { - if (columnStatistics.isEmpty()) { - return new Metrics(fileRowCount, null, null, null, null, null, null); - } - // Columns that are descendants of LIST or MAP types are excluded because: - // 1. Their stats are not used by Apache Iceberg to filter out data files - // 2. Their record count can be larger than table-level row count. There's no good way to calculate nullCounts for them. - // See https://github.com/apache/iceberg/pull/199#discussion_r429443627 - Set excludedColumns = getExcludedColumns(orcColumns); - - ImmutableMap.Builder valueCountsBuilder = ImmutableMap.builder(); - ImmutableMap.Builder nullCountsBuilder = ImmutableMap.builder(); - ImmutableMap.Builder nanCountsBuilder = ImmutableMap.builder(); - ImmutableMap.Builder lowerBoundsBuilder = ImmutableMap.builder(); - ImmutableMap.Builder upperBoundsBuilder = ImmutableMap.builder(); - - // OrcColumnId(0) is the root column that represents file-level schema - for (int i = 1; i < orcColumns.size(); i++) { - OrcColumnId orcColumnId = new OrcColumnId(i); - if (excludedColumns.contains(orcColumnId)) { - continue; - } - OrcType orcColumn = orcColumns.get(orcColumnId); - ColumnStatistics orcColumnStats = columnStatistics.get().get(orcColumnId); - int icebergId = getIcebergId(orcColumn); - Types.NestedField icebergField = icebergSchema.findField(icebergId); - MetricsModes.MetricsMode metricsMode = MetricsUtil.metricsMode(icebergSchema, metricsConfig, icebergId); - if (metricsMode.equals(MetricsModes.None.get())) { - continue; - } - verify(icebergField != null, "Cannot find Iceberg column with ID %s in schema %s", icebergId, icebergSchema); - valueCountsBuilder.put(icebergId, fileRowCount); - if (orcColumnStats.hasNumberOfValues()) { - nullCountsBuilder.put(icebergId, fileRowCount - orcColumnStats.getNumberOfValues()); - } - if (orcColumnStats.getNumberOfNanValues() > 0) { - nanCountsBuilder.put(icebergId, orcColumnStats.getNumberOfNanValues()); - } - - if (!metricsMode.equals(MetricsModes.Counts.get())) { - toIcebergMinMax(orcColumnStats, icebergField.type(), metricsMode).ifPresent(minMax -> { - lowerBoundsBuilder.put(icebergId, minMax.getMin()); - upperBoundsBuilder.put(icebergId, minMax.getMax()); - }); - } - } - Map valueCounts = valueCountsBuilder.buildOrThrow(); - Map nullCounts = nullCountsBuilder.buildOrThrow(); - Map nanCounts = nanCountsBuilder.buildOrThrow(); - Map lowerBounds = lowerBoundsBuilder.buildOrThrow(); - Map upperBounds = upperBoundsBuilder.buildOrThrow(); - return new Metrics( - fileRowCount, - null, // TODO: Add column size accounting to ORC column writers - valueCounts.isEmpty() ? null : valueCounts, - nullCounts.isEmpty() ? null : nullCounts, - nanCounts.isEmpty() ? null : nanCounts, - lowerBounds.isEmpty() ? null : lowerBounds, - upperBounds.isEmpty() ? null : upperBounds); + return orcWriter.getWrittenBytes() + orcWriter.getBufferedBytes(); } - private static Set getExcludedColumns(ColumnMetadata orcColumns) + @Override + public long getMemoryUsage() { - ImmutableSet.Builder excludedColumns = ImmutableSet.builder(); - populateExcludedColumns(orcColumns, ROOT_COLUMN, false, excludedColumns); - return excludedColumns.build(); + return INSTANCE_SIZE + orcWriter.getRetainedBytes(); } - private static void populateExcludedColumns(ColumnMetadata orcColumns, OrcColumnId orcColumnId, boolean exclude, ImmutableSet.Builder excludedColumns) + @Override + public void appendRows(Page dataPage) { - if (exclude) { - excludedColumns.add(orcColumnId); - } - OrcType orcColumn = orcColumns.get(orcColumnId); - switch (orcColumn.getOrcTypeKind()) { - case LIST: - case MAP: - for (OrcColumnId child : orcColumn.getFieldTypeIndexes()) { - populateExcludedColumns(orcColumns, child, true, excludedColumns); - } - return; - case STRUCT: - for (OrcColumnId child : orcColumn.getFieldTypeIndexes()) { - populateExcludedColumns(orcColumns, child, exclude, excludedColumns); - } - return; - default: - // unexpected, TODO throw + Block[] blocks = new Block[fileInputColumnIndexes.length]; + for (int i = 0; i < fileInputColumnIndexes.length; i++) { + int inputColumnIndex = fileInputColumnIndexes[i]; + if (inputColumnIndex < 0) { + blocks[i] = RunLengthEncodedBlock.create(nullBlocks.get(i), dataPage.getPositionCount()); + } + else { + blocks[i] = dataPage.getBlock(inputColumnIndex); + } } - } - private static int getIcebergId(OrcType orcColumn) - { - String icebergId = orcColumn.getAttributes().get(ORC_ICEBERG_ID_KEY); - verify(icebergId != null, "ORC column %s doesn't have an associated Iceberg ID", orcColumn); - return Integer.parseInt(icebergId); + Page page = new Page(dataPage.getPositionCount(), blocks); + try { + orcWriter.write(page); + } + catch (IOException | UncheckedIOException e) { + throw new TrinoException(ICEBERG_WRITER_DATA_ERROR, e); + } } - private static Optional toIcebergMinMax(ColumnStatistics orcColumnStats, org.apache.iceberg.types.Type icebergType, MetricsModes.MetricsMode metricsModes) + @Override + public Closeable commit() { - BooleanStatistics booleanStatistics = orcColumnStats.getBooleanStatistics(); - if (booleanStatistics != null) { - boolean hasTrueValues = booleanStatistics.getTrueValueCount() != 0; - boolean hasFalseValues = orcColumnStats.getNumberOfValues() != booleanStatistics.getTrueValueCount(); - return Optional.of(new IcebergMinMax(icebergType, !hasFalseValues, hasTrueValues, metricsModes)); - } - - IntegerStatistics integerStatistics = orcColumnStats.getIntegerStatistics(); - if (integerStatistics != null) { - Object min = integerStatistics.getMin(); - Object max = integerStatistics.getMax(); - if (min == null || max == null) { - return Optional.empty(); - } - if (icebergType.typeId() == org.apache.iceberg.types.Type.TypeID.INTEGER) { - min = toIntExact((Long) min); - max = toIntExact((Long) max); - } - return Optional.of(new IcebergMinMax(icebergType, min, max, metricsModes)); + try { + orcWriter.close(); } - DoubleStatistics doubleStatistics = orcColumnStats.getDoubleStatistics(); - if (doubleStatistics != null) { - Object min = doubleStatistics.getMin(); - Object max = doubleStatistics.getMax(); - if (min == null || max == null) { - return Optional.empty(); + catch (IOException | UncheckedIOException e) { + try { + rollbackAction.close(); } - if (icebergType.typeId() == org.apache.iceberg.types.Type.TypeID.FLOAT) { - min = ((Double) min).floatValue(); - max = ((Double) max).floatValue(); - } - return Optional.of(new IcebergMinMax(icebergType, min, max, metricsModes)); - } - StringStatistics stringStatistics = orcColumnStats.getStringStatistics(); - if (stringStatistics != null) { - Slice min = stringStatistics.getMin(); - Slice max = stringStatistics.getMax(); - if (min == null || max == null) { - return Optional.empty(); - } - return Optional.of(new IcebergMinMax(icebergType, min.toStringUtf8(), max.toStringUtf8(), metricsModes)); - } - DateStatistics dateStatistics = orcColumnStats.getDateStatistics(); - if (dateStatistics != null) { - Integer min = dateStatistics.getMin(); - Integer max = dateStatistics.getMax(); - if (min == null || max == null) { - return Optional.empty(); + catch (IOException | RuntimeException ex) { + if (!e.equals(ex)) { + e.addSuppressed(ex); + } + log.error(ex, "Exception when committing file"); } - return Optional.of(new IcebergMinMax(icebergType, min, max, metricsModes)); + throw new TrinoException(ICEBERG_WRITER_CLOSE_ERROR, "Error committing write to ORC file", e); } - DecimalStatistics decimalStatistics = orcColumnStats.getDecimalStatistics(); - if (decimalStatistics != null) { - BigDecimal min = decimalStatistics.getMin(); - BigDecimal max = decimalStatistics.getMax(); - if (min == null || max == null) { - return Optional.empty(); + + if (validationInputFactory.isPresent()) { + try { + try (OrcDataSource input = validationInputFactory.get().get()) { + long startThreadCpuTime = THREAD_MX_BEAN.getCurrentThreadCpuTime(); + orcWriter.validate(input); + validationCpuNanos += THREAD_MX_BEAN.getCurrentThreadCpuTime() - startThreadCpuTime; + } } - min = min.setScale(((Types.DecimalType) icebergType).scale()); - max = max.setScale(((Types.DecimalType) icebergType).scale()); - return Optional.of(new IcebergMinMax(icebergType, min, max, metricsModes)); - } - TimestampStatistics timestampStatistics = orcColumnStats.getTimestampStatistics(); - if (timestampStatistics != null) { - Long min = timestampStatistics.getMin(); - Long max = timestampStatistics.getMax(); - if (min == null || max == null) { - return Optional.empty(); + catch (IOException | UncheckedIOException e) { + throw new TrinoException(ICEBERG_WRITE_VALIDATION_FAILED, e); } - // Since ORC timestamp statistics are truncated to millisecond precision, this can cause some column values to fall outside the stats range. - // We are appending 999 microseconds to account for the fact that Trino ORC writer truncates timestamps. - return Optional.of(new IcebergMinMax(icebergType, min * MICROSECONDS_PER_MILLISECOND, (max * MICROSECONDS_PER_MILLISECOND) + (MICROSECONDS_PER_MILLISECOND - 1), metricsModes)); } - return Optional.empty(); + + return rollbackAction; } - private static class IcebergMinMax + @Override + public void rollback() { - private ByteBuffer min; - private ByteBuffer max; - - private IcebergMinMax(org.apache.iceberg.types.Type type, Object min, Object max, MetricsModes.MetricsMode metricsMode) - { - if (metricsMode instanceof MetricsModes.Full) { - this.min = Conversions.toByteBuffer(type, min); - this.max = Conversions.toByteBuffer(type, max); - } - else if (metricsMode instanceof MetricsModes.Truncate truncateMode) { - int truncateLength = truncateMode.length(); - switch (type.typeId()) { - case STRING: - this.min = UnicodeUtil.truncateStringMin(Literal.of((CharSequence) min), truncateLength).toByteBuffer(); - this.max = UnicodeUtil.truncateStringMax(Literal.of((CharSequence) max), truncateLength).toByteBuffer(); - break; - case FIXED: - case BINARY: - this.min = BinaryUtil.truncateBinaryMin(Literal.of((ByteBuffer) min), truncateLength).toByteBuffer(); - this.max = BinaryUtil.truncateBinaryMax(Literal.of((ByteBuffer) max), truncateLength).toByteBuffer(); - break; - default: - this.min = Conversions.toByteBuffer(type, min); - this.max = Conversions.toByteBuffer(type, max); - } - } - else { - throw new UnsupportedOperationException("Unsupported metrics mode for Iceberg Max/Min Bound: " + metricsMode); - } + try (rollbackAction) { + orcWriter.close(); } - - public ByteBuffer getMin() - { - return min; + catch (Exception e) { + throw new TrinoException(ICEBERG_WRITER_CLOSE_ERROR, "Error rolling back write to ORC file", e); } + } - public ByteBuffer getMax() - { - return max; - } + @Override + public long getValidationCpuNanos() + { + return validationCpuNanos; + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("writer", orcWriter) + .toString(); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index beac48fc875a..1342adb698af 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.graph.Traverser; import com.google.inject.Inject; import io.airlift.slice.Slice; import io.trino.annotation.NotThreadSafe; @@ -39,7 +38,6 @@ import io.trino.orc.OrcRecordReader; import io.trino.orc.TupleDomainOrcPredicate; import io.trino.orc.TupleDomainOrcPredicate.TupleDomainOrcPredicateBuilder; -import io.trino.orc.metadata.OrcType; import io.trino.parquet.BloomFilterStore; import io.trino.parquet.Field; import io.trino.parquet.ParquetCorruptionException; @@ -177,10 +175,11 @@ import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle; import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys; import static io.trino.plugin.iceberg.IcebergUtil.schemaFromHandles; -import static io.trino.plugin.iceberg.TypeConverter.ICEBERG_BINARY_TYPE; -import static io.trino.plugin.iceberg.TypeConverter.ORC_ICEBERG_ID_KEY; import static io.trino.plugin.iceberg.delete.EqualityDeleteFilter.readEqualityDeletes; import static io.trino.plugin.iceberg.delete.PositionDeleteFilter.readPositionDeletes; +import static io.trino.plugin.iceberg.util.OrcIcebergIds.fileColumnsByIcebergId; +import static io.trino.plugin.iceberg.util.OrcTypeConverter.ICEBERG_BINARY_TYPE; +import static io.trino.plugin.iceberg.util.OrcTypeConverter.ORC_ICEBERG_ID_KEY; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.block.PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES; import static io.trino.spi.predicate.Utils.nativeValueToBlock; @@ -662,14 +661,7 @@ private static ReaderPageSourceWithRowPositions createOrcPageSource( OrcReader reader = OrcReader.createOrcReader(orcDataSource, options) .orElseThrow(() -> new TrinoException(ICEBERG_BAD_DATA, "ORC file is zero length")); - List fileColumns = reader.getRootColumn().getNestedColumns(); - if (nameMapping.isPresent() && !hasIds(reader.getRootColumn())) { - fileColumns = fileColumns.stream() - .map(orcColumn -> setMissingFieldIds(orcColumn, nameMapping.get(), ImmutableList.of(orcColumn.getColumnName()))) - .collect(toImmutableList()); - } - - Map fileColumnsByIcebergId = mapIdsToOrcFileColumns(fileColumns); + Map fileColumnsByIcebergId = fileColumnsByIcebergId(reader, nameMapping); TupleDomainOrcPredicateBuilder predicateBuilder = TupleDomainOrcPredicate.builder() .setBloomFiltersEnabled(options.isBloomFiltersEnabled()); @@ -803,48 +795,6 @@ else if (orcColumn != null) { } } - private static boolean hasIds(OrcColumn column) - { - if (column.getAttributes().containsKey(ORC_ICEBERG_ID_KEY)) { - return true; - } - - return column.getNestedColumns().stream().anyMatch(IcebergPageSourceProvider::hasIds); - } - - private static OrcColumn setMissingFieldIds(OrcColumn column, NameMapping nameMapping, List qualifiedPath) - { - MappedField mappedField = nameMapping.find(qualifiedPath); - - ImmutableMap.Builder attributes = ImmutableMap.builder() - .putAll(column.getAttributes()); - if (mappedField != null && mappedField.id() != null) { - attributes.put(ORC_ICEBERG_ID_KEY, String.valueOf(mappedField.id())); - } - - return new OrcColumn( - column.getPath(), - column.getColumnId(), - column.getColumnName(), - column.getColumnType(), - column.getOrcDataSourceId(), - column.getNestedColumns().stream() - .map(nestedColumn -> { - ImmutableList.Builder nextQualifiedPath = ImmutableList.builder() - .addAll(qualifiedPath); - if (column.getColumnType() == OrcType.OrcTypeKind.LIST) { - // The Trino ORC reader uses "item" for list element names, but the NameMapper expects "element" - nextQualifiedPath.add("element"); - } - else { - nextQualifiedPath.add(nestedColumn.getColumnName()); - } - return setMissingFieldIds(nestedColumn, nameMapping, nextQualifiedPath.build()); - }) - .collect(toImmutableList()), - attributes.buildOrThrow()); - } - /** * Gets the index based dereference chain to get from the readColumnHandle to the expectedColumnHandle */ @@ -865,20 +815,6 @@ private static List applyProjection(ColumnHandle expectedColumnHandle, return dereferenceChain.build(); } - private static Map mapIdsToOrcFileColumns(List columns) - { - ImmutableMap.Builder columnsById = ImmutableMap.builder(); - Traverser.forTree(OrcColumn::getNestedColumns) - .depthFirstPreOrder(columns) - .forEach(column -> { - String fieldId = column.getAttributes().get(ORC_ICEBERG_ID_KEY); - if (fieldId != null) { - columnsById.put(Integer.parseInt(fieldId), column); - } - }); - return columnsById.buildOrThrow(); - } - private static Integer getIcebergFieldId(OrcColumn column) { String icebergId = column.getAttributes().get(ORC_ICEBERG_ID_KEY); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSortingFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSortingFileWriter.java index 99bc122095f0..1785516600cc 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSortingFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSortingFileWriter.java @@ -30,7 +30,7 @@ import static java.util.Objects.requireNonNull; -public class IcebergSortingFileWriter +public final class IcebergSortingFileWriter implements IcebergFileWriter { private final IcebergFileWriter outputWriter; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index 62696048fc2d..fe71cdbad39b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -26,6 +26,8 @@ import io.trino.plugin.iceberg.catalog.IcebergTableOperations; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.util.DefaultLocationProvider; +import io.trino.plugin.iceberg.util.ObjectStoreLocationProvider; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; @@ -151,17 +153,19 @@ import static java.math.RoundingMode.UNNECESSARY; import static java.util.Comparator.comparing; import static java.util.Objects.requireNonNull; -import static org.apache.iceberg.LocationProviders.locationsFor; import static org.apache.iceberg.MetadataTableUtils.createMetadataTableInstance; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; import static org.apache.iceberg.TableProperties.FORMAT_VERSION; +import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED; +import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED_DEFAULT; import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH; import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION; import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL; import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; import static org.apache.iceberg.types.Type.TypeID.BINARY; import static org.apache.iceberg.types.Type.TypeID.FIXED; +import static org.apache.iceberg.util.PropertyUtil.propertyAsBoolean; public final class IcebergUtil { @@ -609,7 +613,12 @@ public static LocationProvider getLocationProvider(SchemaTableName schemaTableNa throw new TrinoException(NOT_SUPPORTED, "Table " + schemaTableName + " specifies " + storageProperties.get(WRITE_LOCATION_PROVIDER_IMPL) + " as a location provider. Writing to Iceberg tables with custom location provider is not supported."); } - return locationsFor(tableLocation, storageProperties); + + if (propertyAsBoolean(storageProperties, OBJECT_STORE_ENABLED, OBJECT_STORE_ENABLED_DEFAULT)) { + return new ObjectStoreLocationProvider(tableLocation, storageProperties); + } + + return new DefaultLocationProvider(tableLocation, storageProperties); } public static Schema schemaFromMetadata(List columns) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/InternalIcebergConnectorFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/InternalIcebergConnectorFactory.java index 6ea876634e07..0ebc1bd4378b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/InternalIcebergConnectorFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/InternalIcebergConnectorFactory.java @@ -89,7 +89,7 @@ public static Connector createConnector( new MBeanServerModule(), fileSystemFactory .map(factory -> (Module) binder -> binder.bind(TrinoFileSystemFactory.class).toInstance(factory)) - .orElseGet(FileSystemModule::new), + .orElseGet(() -> new FileSystemModule(catalogName, context.getNodeManager(), context.getOpenTelemetry())), binder -> { binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry()); binder.bind(Tracer.class).toInstance(context.getTracer()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TypeConverter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TypeConverter.java index 5840560106c3..1380f6676c11 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TypeConverter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TypeConverter.java @@ -14,11 +14,6 @@ package io.trino.plugin.iceberg; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import io.trino.orc.metadata.ColumnMetadata; -import io.trino.orc.metadata.OrcColumnId; -import io.trino.orc.metadata.OrcType; -import io.trino.orc.metadata.OrcType.OrcTypeKind; import io.trino.spi.TrinoException; import io.trino.spi.type.ArrayType; import io.trino.spi.type.BigintType; @@ -41,12 +36,10 @@ import io.trino.spi.type.UuidType; import io.trino.spi.type.VarbinaryType; import io.trino.spi.type.VarcharType; -import org.apache.iceberg.Schema; import org.apache.iceberg.types.Types; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; @@ -61,11 +54,6 @@ public final class TypeConverter { - public static final String ORC_ICEBERG_ID_KEY = "iceberg.id"; - public static final String ORC_ICEBERG_REQUIRED_KEY = "iceberg.required"; - public static final String ICEBERG_LONG_TYPE = "iceberg.long-type"; - public static final String ICEBERG_BINARY_TYPE = "iceberg.binary-type"; - private TypeConverter() {} public static Type toTrinoType(org.apache.iceberg.types.Type type, TypeManager typeManager) @@ -246,142 +234,4 @@ private static void checkExactlyOne(Optional columnIdentity, Opt } throw new IllegalArgumentException("Either a column identity or nextFieldId is expected"); } - - public static ColumnMetadata toOrcType(Schema schema) - { - return new ColumnMetadata<>(toOrcStructType(0, schema.asStruct(), ImmutableMap.of())); - } - - private static List toOrcType(int nextFieldTypeIndex, org.apache.iceberg.types.Type type, Map attributes) - { - switch (type.typeId()) { - case BOOLEAN: - return ImmutableList.of(new OrcType(OrcTypeKind.BOOLEAN, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); - case INTEGER: - return ImmutableList.of(new OrcType(OrcTypeKind.INT, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); - case LONG: - return ImmutableList.of(new OrcType(OrcTypeKind.LONG, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); - case FLOAT: - return ImmutableList.of(new OrcType(OrcTypeKind.FLOAT, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); - case DOUBLE: - return ImmutableList.of(new OrcType(OrcTypeKind.DOUBLE, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); - case DATE: - return ImmutableList.of(new OrcType(OrcTypeKind.DATE, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); - case TIME: - attributes = ImmutableMap.builder() - .putAll(attributes) - .put(ICEBERG_LONG_TYPE, "TIME") - .buildOrThrow(); - return ImmutableList.of(new OrcType(OrcTypeKind.LONG, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); - case TIMESTAMP: - OrcTypeKind timestampKind = ((Types.TimestampType) type).shouldAdjustToUTC() ? OrcTypeKind.TIMESTAMP_INSTANT : OrcTypeKind.TIMESTAMP; - return ImmutableList.of(new OrcType(timestampKind, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); - case STRING: - return ImmutableList.of(new OrcType(OrcTypeKind.STRING, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); - case FIXED: - return ImmutableList.of(new OrcType(OrcTypeKind.BINARY, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); - case BINARY: - return ImmutableList.of(new OrcType(OrcTypeKind.BINARY, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); - case DECIMAL: - Types.DecimalType decimalType = (Types.DecimalType) type; - return ImmutableList.of(new OrcType(OrcTypeKind.DECIMAL, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.of(decimalType.precision()), Optional.of(decimalType.scale()), attributes)); - case UUID: - attributes = ImmutableMap.builder() - .putAll(attributes) - .put(ICEBERG_BINARY_TYPE, "UUID") - .buildOrThrow(); - return ImmutableList.of(new OrcType(OrcTypeKind.BINARY, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); - case STRUCT: - return toOrcStructType(nextFieldTypeIndex, (Types.StructType) type, attributes); - case LIST: - return toOrcListType(nextFieldTypeIndex, (Types.ListType) type, attributes); - case MAP: - return toOrcMapType(nextFieldTypeIndex, (Types.MapType) type, attributes); - } - throw new TrinoException(NOT_SUPPORTED, "Unsupported Iceberg type: " + type); - } - - private static List toOrcStructType(int nextFieldTypeIndex, Types.StructType structType, Map attributes) - { - nextFieldTypeIndex++; - List fieldTypeIndexes = new ArrayList<>(); - List fieldNames = new ArrayList<>(); - List> fieldTypesList = new ArrayList<>(); - for (Types.NestedField field : structType.fields()) { - fieldTypeIndexes.add(new OrcColumnId(nextFieldTypeIndex)); - fieldNames.add(field.name()); - Map fieldAttributes = ImmutableMap.builder() - .put(ORC_ICEBERG_ID_KEY, Integer.toString(field.fieldId())) - .put(ORC_ICEBERG_REQUIRED_KEY, Boolean.toString(field.isRequired())) - .buildOrThrow(); - List fieldOrcTypes = toOrcType(nextFieldTypeIndex, field.type(), fieldAttributes); - fieldTypesList.add(fieldOrcTypes); - nextFieldTypeIndex += fieldOrcTypes.size(); - } - - ImmutableList.Builder orcTypes = ImmutableList.builder(); - orcTypes.add(new OrcType( - OrcTypeKind.STRUCT, - fieldTypeIndexes, - fieldNames, - Optional.empty(), - Optional.empty(), - Optional.empty(), - attributes)); - fieldTypesList.forEach(orcTypes::addAll); - - return orcTypes.build(); - } - - private static List toOrcListType(int nextFieldTypeIndex, Types.ListType listType, Map attributes) - { - nextFieldTypeIndex++; - Map elementAttributes = ImmutableMap.builder() - .put(ORC_ICEBERG_ID_KEY, Integer.toString(listType.elementId())) - .put(ORC_ICEBERG_REQUIRED_KEY, Boolean.toString(listType.isElementRequired())) - .buildOrThrow(); - List itemTypes = toOrcType(nextFieldTypeIndex, listType.elementType(), elementAttributes); - - List orcTypes = new ArrayList<>(); - orcTypes.add(new OrcType( - OrcTypeKind.LIST, - ImmutableList.of(new OrcColumnId(nextFieldTypeIndex)), - ImmutableList.of("item"), - Optional.empty(), - Optional.empty(), - Optional.empty(), - attributes)); - - orcTypes.addAll(itemTypes); - return orcTypes; - } - - private static List toOrcMapType(int nextFieldTypeIndex, Types.MapType mapType, Map attributes) - { - nextFieldTypeIndex++; - Map keyAttributes = ImmutableMap.builder() - .put(ORC_ICEBERG_ID_KEY, Integer.toString(mapType.keyId())) - .put(ORC_ICEBERG_REQUIRED_KEY, Boolean.toString(true)) - .buildOrThrow(); - List keyTypes = toOrcType(nextFieldTypeIndex, mapType.keyType(), keyAttributes); - Map valueAttributes = ImmutableMap.builder() - .put(ORC_ICEBERG_ID_KEY, Integer.toString(mapType.valueId())) - .put(ORC_ICEBERG_REQUIRED_KEY, Boolean.toString(mapType.isValueRequired())) - .buildOrThrow(); - List valueTypes = toOrcType(nextFieldTypeIndex + keyTypes.size(), mapType.valueType(), valueAttributes); - - List orcTypes = new ArrayList<>(); - orcTypes.add(new OrcType( - OrcTypeKind.MAP, - ImmutableList.of(new OrcColumnId(nextFieldTypeIndex), new OrcColumnId(nextFieldTypeIndex + keyTypes.size())), - ImmutableList.of("key", "value"), - Optional.empty(), - Optional.empty(), - Optional.empty(), - attributes)); - - orcTypes.addAll(keyTypes); - orcTypes.addAll(valueTypes); - return orcTypes; - } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrateProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrateProcedure.java index 6c57853c9e4b..91283376cbb6 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrateProcedure.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrateProcedure.java @@ -25,6 +25,10 @@ import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInputFile; +import io.trino.parquet.ParquetDataSource; +import io.trino.parquet.ParquetReaderOptions; +import io.trino.parquet.reader.MetadataReader; +import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.HiveStorageFormat; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.HiveMetastore; @@ -33,6 +37,7 @@ import io.trino.plugin.hive.metastore.PrincipalPrivileges; import io.trino.plugin.hive.metastore.RawHiveMetastoreFactory; import io.trino.plugin.hive.metastore.Storage; +import io.trino.plugin.hive.parquet.TrinoParquetDataSource; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergFileFormat; import io.trino.plugin.iceberg.IcebergSecurityConfig; @@ -40,6 +45,7 @@ import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; import io.trino.plugin.iceberg.fileio.ForwardingInputFile; +import io.trino.plugin.iceberg.util.OrcMetrics; import io.trino.spi.TrinoException; import io.trino.spi.classloader.ThreadContextClassLoader; import io.trino.spi.connector.ConnectorSession; @@ -63,15 +69,15 @@ import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.MappingUtil; import org.apache.iceberg.mapping.NameMapping; -import org.apache.iceberg.orc.OrcMetrics; import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; import java.io.IOException; +import java.io.UncheckedIOException; import java.lang.invoke.MethodHandle; import java.util.ArrayList; import java.util.HashMap; @@ -79,6 +85,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -225,7 +232,7 @@ public void doMigrate(ConnectorSession session, String schemaName, String tableN ImmutableList.Builder dataFilesBuilder = ImmutableList.builder(); if (hiveTable.getPartitionColumns().isEmpty()) { log.debug("Building data files from %s", location); - dataFilesBuilder.addAll(buildDataFiles(session, recursive, storageFormat, location, partitionSpec, new PartitionData(new Object[]{}), nameMapping)); + dataFilesBuilder.addAll(buildDataFiles(session, recursive, storageFormat, location, partitionSpec, new PartitionData(new Object[]{}), schema)); } else { Map> partitions = listAllPartitions(metastore, hiveTable); @@ -235,7 +242,7 @@ public void doMigrate(ConnectorSession session, String schemaName, String tableN log.debug("Building data files from '%s' for partition %d of %d", storage.getLocation(), fileCount++, partitions.size()); HiveStorageFormat partitionStorageFormat = extractHiveStorageFormat(storage.getStorageFormat()); StructLike partitionData = DataFiles.data(partitionSpec, partition.getKey()); - dataFilesBuilder.addAll(buildDataFiles(session, recursive, partitionStorageFormat, storage.getLocation(), partitionSpec, partitionData, nameMapping)); + dataFilesBuilder.addAll(buildDataFiles(session, recursive, partitionStorageFormat, storage.getLocation(), partitionSpec, partitionData, schema)); } } @@ -330,7 +337,14 @@ public Map> listAllPartitions(HiveMetastore metastor return metastore.getPartitionsByNames(table, partitions.get()); } - private List buildDataFiles(ConnectorSession session, RecursiveDirectory recursive, HiveStorageFormat format, String location, PartitionSpec partitionSpec, StructLike partition, NameMapping nameMapping) + private List buildDataFiles( + ConnectorSession session, + RecursiveDirectory recursive, + HiveStorageFormat format, + String location, + PartitionSpec partitionSpec, + StructLike partition, + Schema schema) throws IOException { // TODO: Introduce parallelism @@ -351,7 +365,7 @@ private List buildDataFiles(ConnectorSession session, RecursiveDirecto throw new TrinoException(NOT_SUPPORTED, "Recursive directory must not exist when recursive_directory argument is 'fail': " + file.location()); } - Metrics metrics = loadMetrics(fileSystem.newInputFile(file.location()), format, nameMapping); + Metrics metrics = loadMetrics(fileSystem.newInputFile(file.location()), format, schema); DataFile dataFile = buildDataFile(file, partition, partitionSpec, format.name(), metrics); dataFilesBuilder.add(dataFile); } @@ -377,17 +391,27 @@ private static IcebergFileFormat toIcebergFileFormat(HiveStorageFormat storageFo }; } - private static Metrics loadMetrics(TrinoInputFile file, HiveStorageFormat storageFormat, NameMapping nameMapping) + private static Metrics loadMetrics(TrinoInputFile file, HiveStorageFormat storageFormat, Schema schema) { - InputFile inputFile = new ForwardingInputFile(file); return switch (storageFormat) { - case ORC -> OrcMetrics.fromInputFile(inputFile, METRICS_CONFIG, nameMapping); - case PARQUET -> ParquetUtil.fileMetrics(inputFile, METRICS_CONFIG, nameMapping); - case AVRO -> new Metrics(Avro.rowCount(inputFile), null, null, null, null); + case ORC -> OrcMetrics.fileMetrics(file, METRICS_CONFIG, schema); + case PARQUET -> parquetMetrics(file, METRICS_CONFIG, MappingUtil.create(schema)); + case AVRO -> new Metrics(Avro.rowCount(new ForwardingInputFile(file)), null, null, null, null); default -> throw new TrinoException(NOT_SUPPORTED, "Unsupported storage format: " + storageFormat); }; } + private static Metrics parquetMetrics(TrinoInputFile file, MetricsConfig metricsConfig, NameMapping nameMapping) + { + try (ParquetDataSource dataSource = new TrinoParquetDataSource(file, new ParquetReaderOptions(), new FileFormatDataSourceStats())) { + ParquetMetadata metadata = MetadataReader.readFooter(dataSource, Optional.empty()); + return ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig, nameMapping); + } + catch (IOException e) { + throw new UncheckedIOException("Failed to read file footer: " + file.location(), e); + } + } + private static List toPartitionFields(io.trino.plugin.hive.metastore.Table table) { ImmutableList.Builder fields = ImmutableList.builder(); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/DefaultLocationProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/DefaultLocationProvider.java new file mode 100644 index 000000000000..a63b3ca54066 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/DefaultLocationProvider.java @@ -0,0 +1,61 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.util; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.io.LocationProvider; + +import java.util.Map; + +import static java.lang.String.format; +import static org.apache.iceberg.util.LocationUtil.stripTrailingSlash; + +// based on org.apache.iceberg.LocationProviders.DefaultLocationProvider +public class DefaultLocationProvider + implements LocationProvider +{ + private final String dataLocation; + + public DefaultLocationProvider(String tableLocation, Map properties) + { + this.dataLocation = stripTrailingSlash(dataLocation(properties, tableLocation)); + } + + @SuppressWarnings("deprecation") + private static String dataLocation(Map properties, String tableLocation) + { + String dataLocation = properties.get(TableProperties.WRITE_DATA_LOCATION); + if (dataLocation == null) { + dataLocation = properties.get(TableProperties.WRITE_FOLDER_STORAGE_LOCATION); + if (dataLocation == null) { + dataLocation = format("%s/data", stripTrailingSlash(tableLocation)); + } + } + return dataLocation; + } + + @Override + public String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename) + { + return "%s/%s/%s".formatted(dataLocation, spec.partitionToPath(partitionData), filename); + } + + @Override + public String newDataLocation(String filename) + { + return "%s/%s".formatted(dataLocation, filename); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/ObjectStoreLocationProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/ObjectStoreLocationProvider.java new file mode 100644 index 000000000000..fb552c381d87 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/ObjectStoreLocationProvider.java @@ -0,0 +1,105 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.util; + +import com.google.common.hash.HashFunction; +import io.trino.filesystem.Location; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.io.LocationProvider; + +import java.util.Base64; +import java.util.Map; + +import static com.google.common.hash.Hashing.murmur3_32_fixed; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.iceberg.util.LocationUtil.stripTrailingSlash; + +// based on org.apache.iceberg.LocationProviders.ObjectStoreLocationProvider +public class ObjectStoreLocationProvider + implements LocationProvider +{ + private static final HashFunction HASH_FUNC = murmur3_32_fixed(); + private static final Base64.Encoder BASE64_ENCODER = Base64.getUrlEncoder().withoutPadding(); + private final String storageLocation; + private final String context; + + public ObjectStoreLocationProvider(String tableLocation, Map properties) + { + this.storageLocation = stripTrailingSlash(dataLocation(properties, tableLocation)); + // if the storage location is within the table prefix, don't add table and database name context + this.context = storageLocation.startsWith(stripTrailingSlash(tableLocation)) ? null : pathContext(tableLocation); + } + + @SuppressWarnings("deprecation") + private static String dataLocation(Map properties, String tableLocation) + { + String dataLocation = properties.get(TableProperties.WRITE_DATA_LOCATION); + if (dataLocation == null) { + dataLocation = properties.get(TableProperties.OBJECT_STORE_PATH); + if (dataLocation == null) { + dataLocation = properties.get(TableProperties.WRITE_FOLDER_STORAGE_LOCATION); + if (dataLocation == null) { + dataLocation = "%s/data".formatted(stripTrailingSlash(tableLocation)); + } + } + } + return dataLocation; + } + + @Override + public String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename) + { + return newDataLocation("%s/%s".formatted(spec.partitionToPath(partitionData), filename)); + } + + @Override + public String newDataLocation(String filename) + { + String hash = computeHash(filename); + if (context != null) { + return "%s/%s/%s/%s".formatted(storageLocation, hash, context, filename); + } + return "%s/%s/%s".formatted(storageLocation, hash, filename); + } + + private static String pathContext(String tableLocation) + { + Location location; + String name; + try { + location = Location.of(stripTrailingSlash(tableLocation)); + name = location.fileName(); + } + catch (IllegalArgumentException | IllegalStateException e) { + return null; + } + + try { + String parent = stripTrailingSlash(location.parentDirectory().path()); + parent = parent.substring(parent.lastIndexOf('/') + 1); + return "%s/%s".formatted(parent, name); + } + catch (IllegalArgumentException | IllegalStateException e) { + return name; + } + } + + private static String computeHash(String fileName) + { + byte[] bytes = HASH_FUNC.hashString(fileName, UTF_8).asBytes(); + return BASE64_ENCODER.encodeToString(bytes); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/OrcIcebergIds.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/OrcIcebergIds.java new file mode 100644 index 000000000000..0143bb50198f --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/OrcIcebergIds.java @@ -0,0 +1,107 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.util; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.graph.Traverser; +import io.trino.orc.OrcColumn; +import io.trino.orc.OrcReader; +import io.trino.orc.metadata.OrcType.OrcTypeKind; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.NameMapping; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.iceberg.util.OrcTypeConverter.ORC_ICEBERG_ID_KEY; + +public final class OrcIcebergIds +{ + private OrcIcebergIds() {} + + public static Map fileColumnsByIcebergId(OrcReader reader, Optional nameMapping) + { + List fileColumns = reader.getRootColumn().getNestedColumns(); + + if (nameMapping.isPresent() && !hasIds(reader.getRootColumn())) { + fileColumns = fileColumns.stream() + .map(orcColumn -> setMissingFieldIds(orcColumn, nameMapping.get(), ImmutableList.of(orcColumn.getColumnName()))) + .collect(toImmutableList()); + } + + return mapIdsToOrcFileColumns(fileColumns); + } + + private static boolean hasIds(OrcColumn column) + { + if (column.getAttributes().containsKey(ORC_ICEBERG_ID_KEY)) { + return true; + } + + return column.getNestedColumns().stream().anyMatch(OrcIcebergIds::hasIds); + } + + private static OrcColumn setMissingFieldIds(OrcColumn column, NameMapping nameMapping, List qualifiedPath) + { + MappedField mappedField = nameMapping.find(qualifiedPath); + + ImmutableMap.Builder attributes = ImmutableMap.builder(); + attributes.putAll(column.getAttributes()); + if ((mappedField != null) && (mappedField.id() != null)) { + attributes.put(ORC_ICEBERG_ID_KEY, String.valueOf(mappedField.id())); + } + + List orcColumns = column.getNestedColumns().stream() + .map(nestedColumn -> setMissingFieldIds(nestedColumn, nameMapping, ImmutableList.builder() + .addAll(qualifiedPath) + .add(pathName(column, nestedColumn)) + .build())) + .collect(toImmutableList()); + + return new OrcColumn( + column.getPath(), + column.getColumnId(), + column.getColumnName(), + column.getColumnType(), + column.getOrcDataSourceId(), + orcColumns, + attributes.buildOrThrow()); + } + + private static String pathName(OrcColumn column, OrcColumn nestedColumn) + { + // Trino ORC reader uses "item" for list element names, but NameMapper expects "element" + if (column.getColumnType() == OrcTypeKind.LIST) { + return "element"; + } + return nestedColumn.getColumnName(); + } + + private static Map mapIdsToOrcFileColumns(List columns) + { + ImmutableMap.Builder columnsById = ImmutableMap.builder(); + Traverser.forTree(OrcColumn::getNestedColumns) + .depthFirstPreOrder(columns) + .forEach(column -> { + String fieldId = column.getAttributes().get(ORC_ICEBERG_ID_KEY); + if (fieldId != null) { + columnsById.put(Integer.parseInt(fieldId), column); + } + }); + return columnsById.buildOrThrow(); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/OrcMetrics.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/OrcMetrics.java new file mode 100644 index 000000000000..aaa785d75c49 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/OrcMetrics.java @@ -0,0 +1,350 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.util; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.airlift.slice.Slice; +import io.trino.filesystem.TrinoInputFile; +import io.trino.orc.OrcColumn; +import io.trino.orc.OrcDataSource; +import io.trino.orc.OrcReader; +import io.trino.orc.OrcReaderOptions; +import io.trino.orc.metadata.ColumnMetadata; +import io.trino.orc.metadata.Footer; +import io.trino.orc.metadata.OrcColumnId; +import io.trino.orc.metadata.OrcType; +import io.trino.orc.metadata.statistics.BooleanStatistics; +import io.trino.orc.metadata.statistics.ColumnStatistics; +import io.trino.orc.metadata.statistics.DateStatistics; +import io.trino.orc.metadata.statistics.DecimalStatistics; +import io.trino.orc.metadata.statistics.DoubleStatistics; +import io.trino.orc.metadata.statistics.IntegerStatistics; +import io.trino.orc.metadata.statistics.StringStatistics; +import io.trino.orc.metadata.statistics.TimestampStatistics; +import io.trino.plugin.hive.FileFormatDataSourceStats; +import io.trino.plugin.iceberg.TrinoOrcDataSource; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.MetricsModes; +import org.apache.iceberg.MetricsUtil; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Type.TypeID; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.BinaryUtil; +import org.apache.iceberg.util.UnicodeUtil; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static io.trino.orc.OrcReader.createOrcReader; +import static io.trino.orc.metadata.OrcColumnId.ROOT_COLUMN; +import static io.trino.plugin.iceberg.util.OrcIcebergIds.fileColumnsByIcebergId; +import static io.trino.plugin.iceberg.util.OrcTypeConverter.ORC_ICEBERG_ID_KEY; +import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND; +import static java.lang.Math.toIntExact; +import static java.util.function.Function.identity; + +public final class OrcMetrics +{ + private OrcMetrics() {} + + public static Metrics fileMetrics(TrinoInputFile file, MetricsConfig metricsConfig, Schema schema) + { + OrcReaderOptions options = new OrcReaderOptions(); + try (OrcDataSource dataSource = new TrinoOrcDataSource(file, options, new FileFormatDataSourceStats())) { + Optional reader = createOrcReader(dataSource, options); + if (reader.isEmpty()) { + return new Metrics(0L, null, null, null, null); + } + Footer footer = reader.get().getFooter(); + + // use name mapping to compute missing Iceberg field IDs + Optional nameMapping = Optional.of(MappingUtil.create(schema)); + Map mappedColumns = fileColumnsByIcebergId(reader.get(), nameMapping) + .values().stream() + .collect(toImmutableMap(OrcColumn::getColumnId, identity())); + + // rebuild type list with mapped columns + List mappedTypes = new ArrayList<>(); + ColumnMetadata types = footer.getTypes(); + for (int i = 0; i < types.size(); i++) { + OrcColumnId id = new OrcColumnId(i); + mappedTypes.add(Optional.ofNullable(mappedColumns.get(id)) + .map(OrcMetrics::toBasicOrcType) + .orElseGet(() -> types.get(id))); + } + + return computeMetrics(metricsConfig, schema, new ColumnMetadata<>(mappedTypes), footer.getNumberOfRows(), footer.getFileStats()); + } + catch (IOException e) { + throw new UncheckedIOException("Failed to read file footer: " + file.location(), e); + } + } + + private static OrcType toBasicOrcType(OrcColumn column) + { + return new OrcType( + column.getColumnType(), + column.getNestedColumns().stream() + .map(OrcColumn::getColumnId) + .collect(toImmutableList()), + null, + Optional.empty(), + Optional.empty(), + Optional.empty(), + column.getAttributes()); + } + + public static Metrics computeMetrics( + MetricsConfig metricsConfig, + Schema icebergSchema, + ColumnMetadata orcColumns, + long fileRowCount, + Optional> columnStatistics) + { + if (columnStatistics.isEmpty()) { + return new Metrics(fileRowCount, null, null, null, null, null, null); + } + // Columns that are descendants of LIST or MAP types are excluded because: + // 1. Their stats are not used by Apache Iceberg to filter out data files + // 2. Their record count can be larger than table-level row count. There's no good way to calculate nullCounts for them. + // See https://github.com/apache/iceberg/pull/199#discussion_r429443627 + Set excludedColumns = getExcludedColumns(orcColumns); + + ImmutableMap.Builder valueCountsBuilder = ImmutableMap.builder(); + ImmutableMap.Builder nullCountsBuilder = ImmutableMap.builder(); + ImmutableMap.Builder nanCountsBuilder = ImmutableMap.builder(); + ImmutableMap.Builder lowerBoundsBuilder = ImmutableMap.builder(); + ImmutableMap.Builder upperBoundsBuilder = ImmutableMap.builder(); + + // OrcColumnId(0) is the root column that represents file-level schema + for (int i = 1; i < orcColumns.size(); i++) { + OrcColumnId orcColumnId = new OrcColumnId(i); + if (excludedColumns.contains(orcColumnId)) { + continue; + } + OrcType orcColumn = orcColumns.get(orcColumnId); + ColumnStatistics orcColumnStats = columnStatistics.get().get(orcColumnId); + int icebergId = getIcebergId(orcColumn); + Types.NestedField icebergField = icebergSchema.findField(icebergId); + MetricsModes.MetricsMode metricsMode = MetricsUtil.metricsMode(icebergSchema, metricsConfig, icebergId); + if (metricsMode.equals(MetricsModes.None.get())) { + continue; + } + verify(icebergField != null, "Cannot find Iceberg column with ID %s in schema %s", icebergId, icebergSchema); + valueCountsBuilder.put(icebergId, fileRowCount); + if (orcColumnStats.hasNumberOfValues()) { + nullCountsBuilder.put(icebergId, fileRowCount - orcColumnStats.getNumberOfValues()); + } + if (orcColumnStats.getNumberOfNanValues() > 0) { + nanCountsBuilder.put(icebergId, orcColumnStats.getNumberOfNanValues()); + } + + if (!metricsMode.equals(MetricsModes.Counts.get())) { + toIcebergMinMax(orcColumnStats, icebergField.type(), metricsMode).ifPresent(minMax -> { + lowerBoundsBuilder.put(icebergId, minMax.getMin()); + upperBoundsBuilder.put(icebergId, minMax.getMax()); + }); + } + } + Map valueCounts = valueCountsBuilder.buildOrThrow(); + Map nullCounts = nullCountsBuilder.buildOrThrow(); + Map nanCounts = nanCountsBuilder.buildOrThrow(); + Map lowerBounds = lowerBoundsBuilder.buildOrThrow(); + Map upperBounds = upperBoundsBuilder.buildOrThrow(); + return new Metrics( + fileRowCount, + null, // TODO: Add column size accounting to ORC column writers + valueCounts.isEmpty() ? null : valueCounts, + nullCounts.isEmpty() ? null : nullCounts, + nanCounts.isEmpty() ? null : nanCounts, + lowerBounds.isEmpty() ? null : lowerBounds, + upperBounds.isEmpty() ? null : upperBounds); + } + + private static Set getExcludedColumns(ColumnMetadata orcColumns) + { + ImmutableSet.Builder excludedColumns = ImmutableSet.builder(); + populateExcludedColumns(orcColumns, ROOT_COLUMN, false, excludedColumns); + return excludedColumns.build(); + } + + private static void populateExcludedColumns(ColumnMetadata orcColumns, OrcColumnId orcColumnId, boolean exclude, ImmutableSet.Builder excludedColumns) + { + if (exclude) { + excludedColumns.add(orcColumnId); + } + OrcType orcColumn = orcColumns.get(orcColumnId); + switch (orcColumn.getOrcTypeKind()) { + case LIST: + case MAP: + for (OrcColumnId child : orcColumn.getFieldTypeIndexes()) { + populateExcludedColumns(orcColumns, child, true, excludedColumns); + } + return; + case STRUCT: + for (OrcColumnId child : orcColumn.getFieldTypeIndexes()) { + populateExcludedColumns(orcColumns, child, exclude, excludedColumns); + } + return; + default: + // unexpected, TODO throw + } + } + + private static int getIcebergId(OrcType orcColumn) + { + String icebergId = orcColumn.getAttributes().get(ORC_ICEBERG_ID_KEY); + verify(icebergId != null, "ORC column %s doesn't have an associated Iceberg ID", orcColumn); + return Integer.parseInt(icebergId); + } + + private static Optional toIcebergMinMax(ColumnStatistics orcColumnStats, Type icebergType, MetricsModes.MetricsMode metricsModes) + { + BooleanStatistics booleanStatistics = orcColumnStats.getBooleanStatistics(); + if (booleanStatistics != null) { + boolean hasTrueValues = booleanStatistics.getTrueValueCount() != 0; + boolean hasFalseValues = orcColumnStats.getNumberOfValues() != booleanStatistics.getTrueValueCount(); + return Optional.of(new IcebergMinMax(icebergType, !hasFalseValues, hasTrueValues, metricsModes)); + } + + IntegerStatistics integerStatistics = orcColumnStats.getIntegerStatistics(); + if (integerStatistics != null) { + Object min = integerStatistics.getMin(); + Object max = integerStatistics.getMax(); + if (min == null || max == null) { + return Optional.empty(); + } + if (icebergType.typeId() == TypeID.INTEGER) { + min = toIntExact((Long) min); + max = toIntExact((Long) max); + } + return Optional.of(new IcebergMinMax(icebergType, min, max, metricsModes)); + } + DoubleStatistics doubleStatistics = orcColumnStats.getDoubleStatistics(); + if (doubleStatistics != null) { + Object min = doubleStatistics.getMin(); + Object max = doubleStatistics.getMax(); + if (min == null || max == null) { + return Optional.empty(); + } + if (icebergType.typeId() == TypeID.FLOAT) { + min = ((Double) min).floatValue(); + max = ((Double) max).floatValue(); + } + return Optional.of(new IcebergMinMax(icebergType, min, max, metricsModes)); + } + StringStatistics stringStatistics = orcColumnStats.getStringStatistics(); + if (stringStatistics != null) { + Slice min = stringStatistics.getMin(); + Slice max = stringStatistics.getMax(); + if (min == null || max == null) { + return Optional.empty(); + } + return Optional.of(new IcebergMinMax(icebergType, min.toStringUtf8(), max.toStringUtf8(), metricsModes)); + } + DateStatistics dateStatistics = orcColumnStats.getDateStatistics(); + if (dateStatistics != null) { + Integer min = dateStatistics.getMin(); + Integer max = dateStatistics.getMax(); + if (min == null || max == null) { + return Optional.empty(); + } + return Optional.of(new IcebergMinMax(icebergType, min, max, metricsModes)); + } + DecimalStatistics decimalStatistics = orcColumnStats.getDecimalStatistics(); + if (decimalStatistics != null) { + BigDecimal min = decimalStatistics.getMin(); + BigDecimal max = decimalStatistics.getMax(); + if (min == null || max == null) { + return Optional.empty(); + } + min = min.setScale(((Types.DecimalType) icebergType).scale()); + max = max.setScale(((Types.DecimalType) icebergType).scale()); + return Optional.of(new IcebergMinMax(icebergType, min, max, metricsModes)); + } + TimestampStatistics timestampStatistics = orcColumnStats.getTimestampStatistics(); + if (timestampStatistics != null) { + Long min = timestampStatistics.getMin(); + Long max = timestampStatistics.getMax(); + if (min == null || max == null) { + return Optional.empty(); + } + // Since ORC timestamp statistics are truncated to millisecond precision, this can cause some column values to fall outside the stats range. + // We are appending 999 microseconds to account for the fact that Trino ORC writer truncates timestamps. + return Optional.of(new IcebergMinMax(icebergType, min * MICROSECONDS_PER_MILLISECOND, (max * MICROSECONDS_PER_MILLISECOND) + (MICROSECONDS_PER_MILLISECOND - 1), metricsModes)); + } + return Optional.empty(); + } + + private static class IcebergMinMax + { + private final ByteBuffer min; + private final ByteBuffer max; + + private IcebergMinMax(Type type, Object min, Object max, MetricsModes.MetricsMode metricsMode) + { + if (metricsMode instanceof MetricsModes.Full) { + this.min = Conversions.toByteBuffer(type, min); + this.max = Conversions.toByteBuffer(type, max); + } + else if (metricsMode instanceof MetricsModes.Truncate truncateMode) { + int truncateLength = truncateMode.length(); + switch (type.typeId()) { + case STRING: + this.min = UnicodeUtil.truncateStringMin(Literal.of((CharSequence) min), truncateLength).toByteBuffer(); + this.max = UnicodeUtil.truncateStringMax(Literal.of((CharSequence) max), truncateLength).toByteBuffer(); + break; + case FIXED: + case BINARY: + this.min = BinaryUtil.truncateBinaryMin(Literal.of((ByteBuffer) min), truncateLength).toByteBuffer(); + this.max = BinaryUtil.truncateBinaryMax(Literal.of((ByteBuffer) max), truncateLength).toByteBuffer(); + break; + default: + this.min = Conversions.toByteBuffer(type, min); + this.max = Conversions.toByteBuffer(type, max); + } + } + else { + throw new UnsupportedOperationException("Unsupported metrics mode for Iceberg Max/Min Bound: " + metricsMode); + } + } + + public ByteBuffer getMin() + { + return min; + } + + public ByteBuffer getMax() + { + return max; + } + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/OrcTypeConverter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/OrcTypeConverter.java new file mode 100644 index 000000000000..744f17d596c1 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/OrcTypeConverter.java @@ -0,0 +1,172 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.util; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.orc.metadata.ColumnMetadata; +import io.trino.orc.metadata.OrcColumnId; +import io.trino.orc.metadata.OrcType; +import io.trino.orc.metadata.OrcType.OrcTypeKind; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types.DecimalType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimestampType; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public final class OrcTypeConverter +{ + public static final String ORC_ICEBERG_ID_KEY = "iceberg.id"; + public static final String ORC_ICEBERG_REQUIRED_KEY = "iceberg.required"; + public static final String ICEBERG_LONG_TYPE = "iceberg.long-type"; + public static final String ICEBERG_BINARY_TYPE = "iceberg.binary-type"; + + private OrcTypeConverter() {} + + public static ColumnMetadata toOrcType(Schema schema) + { + return new ColumnMetadata<>(toOrcStructType(0, schema.asStruct(), ImmutableMap.of())); + } + + private static List toOrcType(int nextFieldTypeIndex, Type type, Map attributes) + { + return switch (type.typeId()) { + case BOOLEAN -> ImmutableList.of(new OrcType(OrcTypeKind.BOOLEAN, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); + case INTEGER -> ImmutableList.of(new OrcType(OrcTypeKind.INT, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); + case LONG -> ImmutableList.of(new OrcType(OrcTypeKind.LONG, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); + case FLOAT -> ImmutableList.of(new OrcType(OrcTypeKind.FLOAT, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); + case DOUBLE -> ImmutableList.of(new OrcType(OrcTypeKind.DOUBLE, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); + case DATE -> ImmutableList.of(new OrcType(OrcTypeKind.DATE, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); + case TIME -> { + attributes = ImmutableMap.builder() + .putAll(attributes) + .put(ICEBERG_LONG_TYPE, "TIME") + .buildOrThrow(); + yield ImmutableList.of(new OrcType(OrcTypeKind.LONG, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); + } + case TIMESTAMP -> { + OrcTypeKind timestampKind = ((TimestampType) type).shouldAdjustToUTC() ? OrcTypeKind.TIMESTAMP_INSTANT : OrcTypeKind.TIMESTAMP; + yield ImmutableList.of(new OrcType(timestampKind, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); + } + case STRING -> ImmutableList.of(new OrcType(OrcTypeKind.STRING, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); + case FIXED, BINARY -> ImmutableList.of(new OrcType(OrcTypeKind.BINARY, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); + case DECIMAL -> { + DecimalType decimalType = (DecimalType) type; + yield ImmutableList.of(new OrcType(OrcTypeKind.DECIMAL, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.of(decimalType.precision()), Optional.of(decimalType.scale()), attributes)); + } + case UUID -> { + attributes = ImmutableMap.builder() + .putAll(attributes) + .put(ICEBERG_BINARY_TYPE, "UUID") + .buildOrThrow(); + yield ImmutableList.of(new OrcType(OrcTypeKind.BINARY, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); + } + case STRUCT -> toOrcStructType(nextFieldTypeIndex, (StructType) type, attributes); + case LIST -> toOrcListType(nextFieldTypeIndex, (ListType) type, attributes); + case MAP -> toOrcMapType(nextFieldTypeIndex, (MapType) type, attributes); + }; + } + + private static List toOrcStructType(int nextFieldTypeIndex, StructType structType, Map attributes) + { + nextFieldTypeIndex++; + + List fieldTypeIndexes = new ArrayList<>(); + List fieldNames = new ArrayList<>(); + List> fieldTypesList = new ArrayList<>(); + for (NestedField field : structType.fields()) { + fieldTypeIndexes.add(new OrcColumnId(nextFieldTypeIndex)); + fieldNames.add(field.name()); + Map fieldAttributes = ImmutableMap.builder() + .put(ORC_ICEBERG_ID_KEY, Integer.toString(field.fieldId())) + .put(ORC_ICEBERG_REQUIRED_KEY, Boolean.toString(field.isRequired())) + .buildOrThrow(); + List fieldOrcTypes = toOrcType(nextFieldTypeIndex, field.type(), fieldAttributes); + fieldTypesList.add(fieldOrcTypes); + nextFieldTypeIndex += fieldOrcTypes.size(); + } + + return ImmutableList.builder() + .add(new OrcType( + OrcTypeKind.STRUCT, + fieldTypeIndexes, + fieldNames, + Optional.empty(), + Optional.empty(), + Optional.empty(), + attributes)) + .addAll(fieldTypesList.stream().flatMap(List::stream).iterator()) + .build(); + } + + private static List toOrcListType(int nextFieldTypeIndex, ListType listType, Map attributes) + { + nextFieldTypeIndex++; + + Map elementAttributes = ImmutableMap.builder() + .put(ORC_ICEBERG_ID_KEY, Integer.toString(listType.elementId())) + .put(ORC_ICEBERG_REQUIRED_KEY, Boolean.toString(listType.isElementRequired())) + .buildOrThrow(); + List itemTypes = toOrcType(nextFieldTypeIndex, listType.elementType(), elementAttributes); + + return ImmutableList.builder() + .add(new OrcType( + OrcTypeKind.LIST, + ImmutableList.of(new OrcColumnId(nextFieldTypeIndex)), + ImmutableList.of("item"), + Optional.empty(), + Optional.empty(), + Optional.empty(), + attributes)) + .addAll(itemTypes) + .build(); + } + + private static List toOrcMapType(int nextFieldTypeIndex, MapType mapType, Map attributes) + { + nextFieldTypeIndex++; + + List keyTypes = toOrcType(nextFieldTypeIndex, mapType.keyType(), ImmutableMap.builder() + .put(ORC_ICEBERG_ID_KEY, Integer.toString(mapType.keyId())) + .put(ORC_ICEBERG_REQUIRED_KEY, Boolean.toString(true)) + .buildOrThrow()); + + Map valueAttributes = ImmutableMap.builder() + .put(ORC_ICEBERG_ID_KEY, Integer.toString(mapType.valueId())) + .put(ORC_ICEBERG_REQUIRED_KEY, Boolean.toString(mapType.isValueRequired())) + .buildOrThrow(); + List valueTypes = toOrcType(nextFieldTypeIndex + keyTypes.size(), mapType.valueType(), valueAttributes); + + return ImmutableList.builder() + .add(new OrcType( + OrcTypeKind.MAP, + ImmutableList.of(new OrcColumnId(nextFieldTypeIndex), new OrcColumnId(nextFieldTypeIndex + keyTypes.size())), + ImmutableList.of("key", "value"), + Optional.empty(), + Optional.empty(), + Optional.empty(), + attributes)) + .addAll(keyTypes) + .addAll(valueTypes) + .build(); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java index 4e44345bf34e..581c374e8eb3 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java @@ -69,6 +69,7 @@ import static io.trino.plugin.hive.HiveType.HIVE_STRING; import static io.trino.plugin.iceberg.ColumnIdentity.TypeCategory.PRIMITIVE; import static io.trino.plugin.iceberg.IcebergFileFormat.ORC; +import static io.trino.plugin.iceberg.util.OrcTypeConverter.toOrcType; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.testing.TestingHandles.TEST_CATALOG_HANDLE; @@ -137,7 +138,7 @@ private static void writeOrcContent(TrinoOutputFile outputFile) OutputStreamOrcDataSink.create(outputFile), columnNames, types, - TypeConverter.toOrcType(TABLE_SCHEMA), + toOrcType(TABLE_SCHEMA), NONE, new OrcWriterOptions(), ImmutableMap.of(), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/TestDefaultLocationProvider.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/TestDefaultLocationProvider.java new file mode 100644 index 000000000000..13aba3879d37 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/TestDefaultLocationProvider.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.util; + +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.io.LocationProvider; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +class TestDefaultLocationProvider +{ + @Test + void testDefault() + { + LocationProvider provider = new DefaultLocationProvider("s3://table-location/", Map.of()); + + assertThat(provider.newDataLocation("test")) + .isEqualTo("s3://table-location/data/test"); + } + + @Test + void testDefaultWithCustomDataLocation() + { + LocationProvider provider = new DefaultLocationProvider("s3://table-location/", Map.of( + TableProperties.WRITE_DATA_LOCATION, "s3://write-location/")); + + assertThat(provider.newDataLocation("test")) + .isEqualTo("s3://write-location/test"); + } + + @SuppressWarnings("deprecation") + @Test + void testDefaultPropertyResolution() + { + LocationProvider provider = new DefaultLocationProvider("s3://table-location/", Map.of( + TableProperties.WRITE_FOLDER_STORAGE_LOCATION, "s3://folder-location/")); + + assertThat(provider.newDataLocation("test")) + .isEqualTo("s3://folder-location/test"); + + provider = new DefaultLocationProvider("s3://table-location/", Map.of( + TableProperties.WRITE_FOLDER_STORAGE_LOCATION, "s3://folder-location/", + TableProperties.WRITE_DATA_LOCATION, "s3://data-location/")); + + assertThat(provider.newDataLocation("test")) + .isEqualTo("s3://data-location/test"); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/TestObjectStoreLocationProvider.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/TestObjectStoreLocationProvider.java new file mode 100644 index 000000000000..3bab96bc719b --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/TestObjectStoreLocationProvider.java @@ -0,0 +1,104 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.util; + +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.io.LocationProvider; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +class TestObjectStoreLocationProvider +{ + @Test + void testObjectStorageWithinTableLocation() + { + LocationProvider provider = new ObjectStoreLocationProvider("s3://table-location/xyz", Map.of()); + + assertThat(provider.newDataLocation("test")) + .isEqualTo("s3://table-location/xyz/data/E9Jrug/test"); + } + + @Test + void testObjectStorageWithContextEmpty() + { + LocationProvider provider = new ObjectStoreLocationProvider( + "s3://table-location/", + Map.of(TableProperties.WRITE_DATA_LOCATION, "s3://data-location/write/")); + + assertThat(provider.newDataLocation("test")) + .isEqualTo("s3://data-location/write/E9Jrug/test"); + } + + @Test + void testObjectStorageWithContextTable() + { + LocationProvider provider = new ObjectStoreLocationProvider( + "s3://table-location/xyz", + Map.of(TableProperties.WRITE_DATA_LOCATION, "s3://data-location/write/")); + + assertThat(provider.newDataLocation("test")) + .isEqualTo("s3://data-location/write/E9Jrug/xyz/test"); + } + + @Test + void testObjectStorageWithContextDatabaseTable() + { + LocationProvider provider = new ObjectStoreLocationProvider( + "s3://table-location/abc/xyz", + Map.of(TableProperties.WRITE_DATA_LOCATION, "s3://data-location/write/")); + + assertThat(provider.newDataLocation("test")) + .isEqualTo("s3://data-location/write/E9Jrug/abc/xyz/test"); + } + + @Test + void testObjectStorageWithContextPrefixDatabaseTable() + { + LocationProvider provider = new ObjectStoreLocationProvider( + "s3://table-location/hello/world/abc/xyz", + Map.of(TableProperties.WRITE_DATA_LOCATION, "s3://data-location/write/")); + + assertThat(provider.newDataLocation("test")) + .isEqualTo("s3://data-location/write/E9Jrug/abc/xyz/test"); + } + + @SuppressWarnings("deprecation") + @Test + void testObjectStoragePropertyResolution() + { + LocationProvider provider = new ObjectStoreLocationProvider("s3://table-location/", Map.of( + TableProperties.WRITE_FOLDER_STORAGE_LOCATION, "s3://folder-location/xyz")); + + assertThat(provider.newDataLocation("test")) + .isEqualTo("s3://folder-location/xyz/E9Jrug/test"); + + provider = new ObjectStoreLocationProvider("s3://table-location/", Map.of( + TableProperties.WRITE_FOLDER_STORAGE_LOCATION, "s3://folder-location/abc", + TableProperties.OBJECT_STORE_PATH, "s3://object-location/xyz")); + + assertThat(provider.newDataLocation("test")) + .isEqualTo("s3://object-location/xyz/E9Jrug/test"); + + provider = new ObjectStoreLocationProvider("s3://table-location/", Map.of( + TableProperties.WRITE_FOLDER_STORAGE_LOCATION, "s3://folder-location/abc", + TableProperties.OBJECT_STORE_PATH, "s3://object-location/abc", + TableProperties.WRITE_DATA_LOCATION, "s3://data-location/xyz")); + + assertThat(provider.newDataLocation("test")) + .isEqualTo("s3://data-location/xyz/E9Jrug/test"); + } +} diff --git a/plugin/trino-session-property-managers/pom.xml b/plugin/trino-session-property-managers/pom.xml index 804800748261..74aabce98b64 100644 --- a/plugin/trino-session-property-managers/pom.xml +++ b/plugin/trino-session-property-managers/pom.xml @@ -156,6 +156,12 @@ test + + io.trino + trino-hdfs + test + + io.trino trino-hive diff --git a/pom.xml b/pom.xml index cc7582914043..8b51e490eaf5 100644 --- a/pom.xml +++ b/pom.xml @@ -174,7 +174,7 @@ 2.7.7-1 1.10.2 - 237 + 238 4.13.1 14.0.0 1.11.3 diff --git a/testing/trino-faulttolerant-tests/pom.xml b/testing/trino-faulttolerant-tests/pom.xml index 4df8c4a8066d..511f248352a2 100644 --- a/testing/trino-faulttolerant-tests/pom.xml +++ b/testing/trino-faulttolerant-tests/pom.xml @@ -185,6 +185,12 @@ test + + io.trino + trino-hdfs + test + + io.trino trino-hive