From a538266ace8a8c534b05f7a1c33af39f30fb0c9c Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Mon, 4 Jul 2022 15:40:06 +0200 Subject: [PATCH 1/2] Use plugin class loader for the operations of `IcebergPageSource` Make use of wrapper class `ClassLoaderSafeUpdatablePageSource` in order to hide the aspect related to ensuring the usage of the plugin class loader before calling the `IcebergPageSource` operations. --- .../ClassLoaderSafeUpdatablePageSource.java | 147 ++++++++++++++++++ .../TestClassLoaderSafeWrappers.java | 2 + .../iceberg/IcebergPageSourceProvider.java | 25 +-- .../product/iceberg/TestIcebergOptimize.java | 118 ++++++++++++++ 4 files changed, 281 insertions(+), 11 deletions(-) create mode 100644 lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeUpdatablePageSource.java create mode 100644 testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergOptimize.java diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeUpdatablePageSource.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeUpdatablePageSource.java new file mode 100644 index 000000000000..2ef9e47ec4c3 --- /dev/null +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeUpdatablePageSource.java @@ -0,0 +1,147 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.base.classloader; + +import io.airlift.slice.Slice; +import io.trino.spi.Page; +import io.trino.spi.block.Block; +import io.trino.spi.classloader.ThreadContextClassLoader; +import io.trino.spi.connector.UpdatablePageSource; +import io.trino.spi.metrics.Metrics; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; + +public class ClassLoaderSafeUpdatablePageSource + implements UpdatablePageSource +{ + private final UpdatablePageSource delegate; + private final ClassLoader classLoader; + + public ClassLoaderSafeUpdatablePageSource(UpdatablePageSource delegate, ClassLoader classLoader) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.classLoader = requireNonNull(classLoader, "classLoader is null"); + } + + @Override + public long getCompletedBytes() + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.getCompletedBytes(); + } + } + + @Override + public OptionalLong getCompletedPositions() + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.getCompletedPositions(); + } + } + + @Override + public long getReadTimeNanos() + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.getReadTimeNanos(); + } + } + + @Override + public boolean isFinished() + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.isFinished(); + } + } + + @Override + public Page getNextPage() + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.getNextPage(); + } + } + + @Override + public long getMemoryUsage() + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.getMemoryUsage(); + } + } + + @Override + public void close() + throws IOException + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + delegate.close(); + } + } + + @Override + public CompletableFuture isBlocked() + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.isBlocked(); + } + } + + @Override + public Metrics getMetrics() + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.getMetrics(); + } + } + + @Override + public void deleteRows(Block rowIds) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + delegate.deleteRows(rowIds); + } + } + + @Override + public void updateRows(Page page, List columnValueAndRowIdChannels) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + delegate.updateRows(page, columnValueAndRowIdChannels); + } + } + + @Override + public CompletableFuture> finish() + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.finish(); + } + } + + @Override + public void abort() + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + delegate.abort(); + } + } +} diff --git a/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/classloader/TestClassLoaderSafeWrappers.java b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/classloader/TestClassLoaderSafeWrappers.java index 6f8fab41065f..448db30b079a 100644 --- a/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/classloader/TestClassLoaderSafeWrappers.java +++ b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/classloader/TestClassLoaderSafeWrappers.java @@ -24,6 +24,7 @@ import io.trino.spi.connector.ConnectorSplitSource; import io.trino.spi.connector.RecordSet; import io.trino.spi.connector.SystemTable; +import io.trino.spi.connector.UpdatablePageSource; import io.trino.spi.eventlistener.EventListener; import io.trino.spi.ptf.ConnectorTableFunction; import org.testng.annotations.Test; @@ -51,6 +52,7 @@ public void test() testClassLoaderSafe(ConnectorSplitManager.class, ClassLoaderSafeConnectorSplitManager.class); testClassLoaderSafe(ConnectorNodePartitioningProvider.class, ClassLoaderSafeNodePartitioningProvider.class); testClassLoaderSafe(ConnectorSplitSource.class, ClassLoaderSafeConnectorSplitSource.class); + testClassLoaderSafe(UpdatablePageSource.class, ClassLoaderSafeUpdatablePageSource.class); testClassLoaderSafe(SystemTable.class, ClassLoaderSafeSystemTable.class); testClassLoaderSafe(ConnectorRecordSetProvider.class, ClassLoaderSafeConnectorRecordSetProvider.class); testClassLoaderSafe(RecordSet.class, ClassLoaderSafeRecordSet.class); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index dbc24238c7bf..a2bd9041360d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -37,6 +37,7 @@ import io.trino.parquet.predicate.Predicate; import io.trino.parquet.reader.MetadataReader; import io.trino.parquet.reader.ParquetReader; +import io.trino.plugin.base.classloader.ClassLoaderSafeUpdatablePageSource; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.HdfsEnvironment; import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; @@ -336,17 +337,19 @@ public ConnectorPageSource createPageSource( table.getStorageProperties(), maxOpenPartitions); - return new IcebergPageSource( - tableSchema, - icebergColumns, - requiredColumns, - readColumns, - dataPageSource.get(), - projectionsAdapter, - Optional.of(deleteFilter).filter(filter -> filter.hasPosDeletes() || filter.hasEqDeletes()), - positionDeleteSink, - updatedRowPageSinkSupplier, - table.getUpdatedColumns()); + return new ClassLoaderSafeUpdatablePageSource( + new IcebergPageSource( + tableSchema, + icebergColumns, + requiredColumns, + readColumns, + dataPageSource.get(), + projectionsAdapter, + Optional.of(deleteFilter).filter(filter -> filter.hasPosDeletes() || filter.hasEqDeletes()), + positionDeleteSink, + updatedRowPageSinkSupplier, + table.getUpdatedColumns()), + getClass().getClassLoader()); } private ReaderPageSource createDataPageSource( diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergOptimize.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergOptimize.java new file mode 100644 index 000000000000..8af5e4859acb --- /dev/null +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergOptimize.java @@ -0,0 +1,118 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.iceberg; + +import io.trino.tempto.ProductTest; +import org.assertj.core.api.Assertions; +import org.testng.annotations.Test; + +import java.util.List; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.tempto.assertions.QueryAssert.Row.row; +import static io.trino.tempto.assertions.QueryAssert.assertThat; +import static io.trino.tests.product.TestGroups.ICEBERG; +import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix; +import static io.trino.tests.product.utils.QueryExecutors.onSpark; +import static io.trino.tests.product.utils.QueryExecutors.onTrino; +import static java.lang.String.format; + +/** + * Tests compatibility between Iceberg connector and Spark Iceberg. + */ +public class TestIcebergOptimize + extends ProductTest +{ + // see spark-defaults.conf + private static final String SPARK_CATALOG = "iceberg_test"; + private static final String TRINO_CATALOG = "iceberg"; + private static final String TEST_SCHEMA_NAME = "default"; + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + public void testOptimizeTableAfterDelete() + { + String baseTableName = "test_optimize_with_small_split_size_" + randomTableSuffix(); + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); + onTrino().executeQuery("CREATE TABLE " + trinoTableName + "(regionkey integer, country varchar, description varchar) "); + + onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES " + + "(1, 'Poland', 'Lorem ipsum dolor sit amet, consectetur adipiscing elit. Pellentesque rutrum erat faucibus tempus ullamcorper. Duis at auctor est, a accumsan magna.'), " + + "(1, 'Austria', ' Aliquam rhoncus tortor eu sagittis vulputate. Donec non erat nec dui tempor mattis pellentesque quis sapien. Proin dolor elit, porttitor aliquam erat et, aliquam eleifend elit. '), " + + "(1, 'France', 'Cras et elit sed nisl faucibus volutpat sed ac sapien. Sed ut metus vulputate, feugiat massa sed, lacinia ipsum. Phasellus ultrices ligula non ultricies gravida'), " + + "(1, 'Germany', 'Suspendisse eu nunc in lectus blandit pretium posuere non libero. Donec eu ligula hendrerit, pellentesque risus et, luctus odio.'), " + + "(2, 'United States of America', 'Pellentesque fermentum, tellus eget laoreet aliquam, nibh libero sollicitudin augue, vel posuere tellus odio vel est. Integer ac sem malesuada nibh imperdiet placerat.'), " + + "(2, 'Canada', 'Pellentesque porta nisi vel viverra sodales. Praesent et magna venenatis, varius quam eget, tristique orci. In ac cursus felis, vel elementum odio.'), " + + "(3, 'Japan', 'Sed vitae dignissim mi, eu mattis ante. Nam vulputate augue magna, vel viverra diam interdum at. Phasellus vehicula ante sit amet cursus venenatis.')," + + "(3, 'China', 'Fusce sit amet eleifend nunc. Maecenas bibendum felis felis, eu cursus neque viverra sed. Sed faucibus augue eu placerat elementum. Nam vitae hendrerit odio.')," + + "(3, 'Laos', 'Nulla vel placerat nibh. Pellentesque a cursus nunc. In tristique sollicitudin vestibulum. Sed imperdiet justo eget rhoncus condimentum. In commodo, purus sit amet malesuada rutrum, neque magna euismod elit')"); + + List initialFiles = getActiveFiles(TRINO_CATALOG, TEST_SCHEMA_NAME, baseTableName); + + onTrino().executeQuery("DELETE FROM " + trinoTableName + " WHERE regionkey = 1"); + onTrino().executeQuery("DELETE FROM " + trinoTableName + " WHERE regionkey = 2"); + + // Verify that delete files exists + assertThat( + onTrino().executeQuery( + format("SELECT summary['total-delete-files'] FROM %s.%s.\"%s$snapshots\" ", TRINO_CATALOG, TEST_SCHEMA_NAME, baseTableName) + + "WHERE snapshot_id = " + getCurrentSnapshotId(TRINO_CATALOG, TEST_SCHEMA_NAME, baseTableName))) + .containsOnly(row("2")); + + // Set the split size to a small number of bytes so each ORC stripe gets its own split. + // TODO Drop Spark dependency once that the setting 'read.split.target-size' can be set through Trino + onSpark().executeQuery("ALTER TABLE " + sparkTableName + " SET TBLPROPERTIES ('read.split.target-size'='100')"); + + onTrino().executeQuery("ALTER TABLE " + trinoTableName + " EXECUTE OPTIMIZE"); + + List updatedFiles = getActiveFiles(TRINO_CATALOG, TEST_SCHEMA_NAME, baseTableName); + Assertions.assertThat(updatedFiles) + .hasSize(1) + .isNotEqualTo(initialFiles); + + assertThat(onTrino().executeQuery("SELECT * FROM " + trinoTableName)) + .containsOnly( + row(3, "Japan", "Sed vitae dignissim mi, eu mattis ante. Nam vulputate augue magna, vel viverra diam interdum at. Phasellus vehicula ante sit amet cursus venenatis."), + row(3, "China", "Fusce sit amet eleifend nunc. Maecenas bibendum felis felis, eu cursus neque viverra sed. Sed faucibus augue eu placerat elementum. Nam vitae hendrerit odio."), + row(3, "Laos", "Nulla vel placerat nibh. Pellentesque a cursus nunc. In tristique sollicitudin vestibulum. Sed imperdiet justo eget rhoncus condimentum. In commodo, purus sit amet malesuada rutrum, neque magna euismod elit")); + + onTrino().executeQuery("DROP TABLE " + trinoTableName); + } + + private List getActiveFiles(String catalog, String schema, String tableName) + { + return onTrino().executeQuery(format("SELECT file_path FROM %s.%s.\"%s$files\"", catalog, schema, tableName)) + .rows().stream() + .map(row -> (String) row.get(0)) + .collect(toImmutableList()); + } + + private long getCurrentSnapshotId(String catalog, String schema, String tableName) + { + return (long) getOnlyElement(getOnlyElement(onTrino().executeQuery(format("SELECT snapshot_id FROM %s.%s.\"%s$snapshots\" ORDER BY committed_at DESC LIMIT 1", catalog, schema, tableName)).rows())); + } + + private static String sparkTableName(String tableName) + { + return format("%s.%s.%s", SPARK_CATALOG, TEST_SCHEMA_NAME, tableName); + } + + private static String trinoTableName(String tableName) + { + return format("%s.%s.%s", TRINO_CATALOG, TEST_SCHEMA_NAME, tableName); + } +} From 684674a0bdd9a622cc71b8aced512b64e992673c Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Wed, 6 Jul 2022 12:59:45 +0200 Subject: [PATCH 2/2] Ensure the correct instantiation of the `Configuration` object During the instantiation Configuration captures current thread context class loader. Ensure that the class loader captured corresponds to the plugin class loader. --- lib/trino-hadoop-toolkit/pom.xml | 9 +++++---- .../hadoop/ConfigurationInstantiator.java | 19 ++++++------------- .../phoenix5/ConfigurationInstantiator.java | 19 ++++++------------- 3 files changed, 17 insertions(+), 30 deletions(-) diff --git a/lib/trino-hadoop-toolkit/pom.xml b/lib/trino-hadoop-toolkit/pom.xml index 06d4c894fcc7..e6532c158982 100644 --- a/lib/trino-hadoop-toolkit/pom.xml +++ b/lib/trino-hadoop-toolkit/pom.xml @@ -17,14 +17,15 @@ + - io.trino.hadoop - hadoop-apache + io.trino + trino-spi - com.google.guava - guava + io.trino.hadoop + hadoop-apache diff --git a/lib/trino-hadoop-toolkit/src/main/java/io/trino/hadoop/ConfigurationInstantiator.java b/lib/trino-hadoop-toolkit/src/main/java/io/trino/hadoop/ConfigurationInstantiator.java index 3bfcb1fde063..9bf1262424ba 100644 --- a/lib/trino-hadoop-toolkit/src/main/java/io/trino/hadoop/ConfigurationInstantiator.java +++ b/lib/trino-hadoop-toolkit/src/main/java/io/trino/hadoop/ConfigurationInstantiator.java @@ -13,11 +13,10 @@ */ package io.trino.hadoop; +import io.trino.spi.classloader.ThreadContextClassLoader; import org.apache.hadoop.conf.Configuration; import org.gaul.modernizer_maven_annotations.SuppressModernizer; -import static com.google.common.base.Preconditions.checkState; - public final class ConfigurationInstantiator { private ConfigurationInstantiator() {} @@ -37,17 +36,11 @@ public static Configuration newConfigurationWithDefaultResources() private static Configuration newConfiguration(boolean loadDefaults) { - // Configuration captures TCCL and it may used later e.g. to load filesystem implementation class - ClassLoader tccl = Thread.currentThread().getContextClassLoader(); - ClassLoader expectedClassLoader = ConfigurationInstantiator.class.getClassLoader(); - checkState( - tccl == expectedClassLoader, - "During instantiation, the Configuration object captures the TCCL and uses it to resolve classes by name. " + - "For this reason, the current TCCL %s should be same as this class's classloader %s. " + - "Otherwise the constructed Configuration will use *some* classloader to resolve classes", - tccl, - expectedClassLoader); - return newConfigurationWithTccl(loadDefaults); + // Ensure that the context class loader used while instantiating the `Configuration` object corresponds to the + // class loader of the `ConfigurationInstantiator` + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(ConfigurationInstantiator.class.getClassLoader())) { + return newConfigurationWithTccl(loadDefaults); + } } // Usage of `new Configuration(boolean)` is not allowed. Only ConfigurationInstantiator diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/ConfigurationInstantiator.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/ConfigurationInstantiator.java index c04f963a701a..facaa810c483 100644 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/ConfigurationInstantiator.java +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/ConfigurationInstantiator.java @@ -13,28 +13,21 @@ */ package io.trino.plugin.phoenix5; +import io.trino.spi.classloader.ThreadContextClassLoader; import org.apache.hadoop.conf.Configuration; import org.gaul.modernizer_maven_annotations.SuppressModernizer; -import static com.google.common.base.Preconditions.checkState; - final class ConfigurationInstantiator { private ConfigurationInstantiator() {} public static Configuration newEmptyConfiguration() { - // Configuration captures TCCL and it may used later e.g. to load filesystem implementation class - ClassLoader tccl = Thread.currentThread().getContextClassLoader(); - ClassLoader expectedClassLoader = ConfigurationInstantiator.class.getClassLoader(); - checkState( - tccl == expectedClassLoader, - "During instantiation, the Configuration object captures the TCCL and uses it to resolve classes by name. " + - "For this reason, the current TCCL %s should be same as this class's classloader %s. " + - "Otherwise the constructed Configuration will use *some* classloader to resolve classes", - tccl, - expectedClassLoader); - return newConfigurationWithTccl(); + // Ensure that the context class loader used while instantiating the `Configuration` object corresponds to the + // class loader of the `ConfigurationInstantiator` + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(ConfigurationInstantiator.class.getClassLoader())) { + return newConfigurationWithTccl(); + } } // Usage of `new Configuration(boolean)` is not allowed. Only ConfigurationInstantiator