diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java index a37d3a496aed..48d89bbe5058 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java @@ -87,6 +87,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -441,8 +442,20 @@ private Map> getMetastorePartitionColumnStatis private static Map groupStatisticsByColumn(List statistics, OptionalLong rowCount) { - return statistics.stream() - .collect(toImmutableMap(ColumnStatisticsObj::getColName, statisticsObj -> ThriftMetastoreUtil.fromMetastoreApiColumnStatistics(statisticsObj, rowCount))); + Map statisticsByColumn = new HashMap<>(); + for (ColumnStatisticsObj stats : statistics) { + HiveColumnStatistics newColumnStatistics = ThriftMetastoreUtil.fromMetastoreApiColumnStatistics(stats, rowCount); + if (statisticsByColumn.containsKey(stats.getColName())) { + HiveColumnStatistics existingColumnStatistics = statisticsByColumn.get(stats.getColName()); + if (!newColumnStatistics.equals(existingColumnStatistics)) { + log.warn("Ignore inconsistent statistics in %s column: %s and %s", stats.getColName(), newColumnStatistics, existingColumnStatistics); + } + } + else { + statisticsByColumn.put(stats.getColName(), newColumnStatistics); + } + } + return ImmutableMap.copyOf(statisticsByColumn); } @Override diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveAnalyzeCorruptStatistics.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveAnalyzeCorruptStatistics.java new file mode 100644 index 000000000000..d1d0f33c04d3 --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveAnalyzeCorruptStatistics.java @@ -0,0 +1,123 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive; + +import io.airlift.concurrent.MoreFutures; +import io.airlift.units.Duration; +import io.trino.plugin.hive.containers.HiveMinioDataLake; +import io.trino.plugin.hive.s3.S3HiveQueryRunner; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import org.testng.SkipException; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.stream.IntStream; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.testing.sql.TestTable.randomTableSuffix; +import static java.lang.String.join; +import static java.util.concurrent.Executors.newFixedThreadPool; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestHiveAnalyzeCorruptStatistics + extends AbstractTestQueryFramework +{ + private HiveMinioDataLake hiveMinioDataLake; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + hiveMinioDataLake = closeAfterClass(new HiveMinioDataLake("test-analyze")); + hiveMinioDataLake.start(); + + return S3HiveQueryRunner.builder(hiveMinioDataLake) + // Increase timeout to allow operations on a table having many columns + .setThriftMetastoreTimeout(new Duration(5, MINUTES)) + .build(); + } + + // Repeat test with invocationCount for better test coverage, since the tested aspect is inherently non-deterministic. + @Test(invocationCount = 3) + public void testAnalyzeCorruptColumnStatisticsOnEmptyTable() + throws Exception + { + String tableName = "test_analyze_corrupt_column_statistics_" + randomTableSuffix(); + + // Concurrent ANALYZE statements generate duplicated rows in Thrift metastore's TAB_COL_STATS table when column statistics is empty + prepareBrokenColumnStatisticsTable(tableName); + + // SHOW STATS should succeed even when the column statistics is broken + assertQuerySucceeds("SHOW STATS FOR " + tableName); + + // ANALYZE and drop_stats are unsupported for tables having broken column statistics + assertThatThrownBy(() -> query("ANALYZE " + tableName)) + .hasMessage(hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint().toString()) // Thrift metastore doesn't throw helpful message + .hasStackTraceContaining("ThriftHiveMetastore.setTableColumnStatistics"); + + assertThatThrownBy(() -> query("CALL system.drop_stats('tpch', '" + tableName + "')")) + .hasMessageContaining("The query returned more than one instance BUT either unique is set to true or only aggregates are to be returned, so should have returned one result maximum"); + + assertUpdate("DROP TABLE " + tableName); + } + + private void prepareBrokenColumnStatisticsTable(String tableName) + throws InterruptedException + { + int columnNumber = 1000; + List columnNames = IntStream.rangeClosed(1, columnNumber).mapToObj(x -> "col_" + x + " int").collect(toImmutableList()); + List columnValues = IntStream.rangeClosed(1, columnNumber).mapToObj(String::valueOf).collect(toImmutableList()); + + assertUpdate("CREATE TABLE " + tableName + "(" + join(",", columnNames) + ")"); + assertUpdate("INSERT INTO " + tableName + " VALUES (" + join(",", columnValues) + ")", 1); + assertUpdate("CALL system.drop_stats('tpch', '" + tableName + "')"); + + int threads = 10; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + try { + List> futures = IntStream.range(0, threads) + .mapToObj(threadNumber -> executor.submit(() -> { + barrier.await(10, SECONDS); + try { + getQueryRunner().execute("ANALYZE " + tableName); + return true; + } + catch (Exception e) { + return false; + } + })) + .collect(toImmutableList()); + + long succeeded = futures.stream() + .map(MoreFutures::getFutureValue) + .filter(success -> success) + .count(); + + if (succeeded < 2) { + throw new SkipException("Expect other invocations succeed"); + } + } + finally { + executor.shutdownNow(); + executor.awaitTermination(10, SECONDS); + } + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingThriftHiveMetastoreBuilder.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingThriftHiveMetastoreBuilder.java index 1e506755c64f..6e485a4c00c2 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingThriftHiveMetastoreBuilder.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingThriftHiveMetastoreBuilder.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.net.HostAndPort; +import io.airlift.units.Duration; import io.trino.hdfs.DynamicHdfsConfiguration; import io.trino.hdfs.HdfsConfig; import io.trino.hdfs.HdfsConfigurationInitializer; @@ -68,6 +69,15 @@ public static TestingThriftHiveMetastoreBuilder testingThriftHiveMetastoreBuilde private TestingThriftHiveMetastoreBuilder() {} + public TestingThriftHiveMetastoreBuilder metastoreClient(HostAndPort address, Duration timeout) + { + requireNonNull(address, "address is null"); + requireNonNull(timeout, "timeout is null"); + checkState(metastoreLocator == null, "Metastore client already set"); + metastoreLocator = new TestingMetastoreLocator(HiveTestUtils.SOCKS_PROXY, address, timeout); + return this; + } + public TestingThriftHiveMetastoreBuilder metastoreClient(HostAndPort address) { requireNonNull(address, "address is null"); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestingMetastoreLocator.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestingMetastoreLocator.java index c3fdb1fb4926..e355f598f583 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestingMetastoreLocator.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestingMetastoreLocator.java @@ -26,7 +26,7 @@ public class TestingMetastoreLocator implements MetastoreLocator { private static final HiveMetastoreAuthentication AUTHENTICATION = new NoHiveMetastoreAuthentication(); - private static final Duration TIMEOUT = new Duration(20, SECONDS); + public static final Duration TIMEOUT = new Duration(20, SECONDS); private final DefaultThriftMetastoreClientFactory factory; private final HostAndPort address; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/S3HiveQueryRunner.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/S3HiveQueryRunner.java index 68051de2a58a..2e676c70a683 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/S3HiveQueryRunner.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/S3HiveQueryRunner.java @@ -16,9 +16,11 @@ import com.google.common.collect.ImmutableMap; import com.google.common.net.HostAndPort; import io.airlift.log.Logger; +import io.airlift.units.Duration; import io.trino.plugin.hive.HiveQueryRunner; import io.trino.plugin.hive.containers.HiveMinioDataLake; import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore; +import io.trino.plugin.hive.metastore.thrift.TestingMetastoreLocator; import io.trino.testing.DistributedQueryRunner; import io.trino.tpch.TpchTable; @@ -82,6 +84,7 @@ public static class Builder extends HiveQueryRunner.Builder { private HostAndPort hiveMetastoreEndpoint; + private Duration thriftMetastoreTimeout = TestingMetastoreLocator.TIMEOUT; private String s3Endpoint; private String s3AccessKey; private String s3SecretKey; @@ -93,6 +96,12 @@ public Builder setHiveMetastoreEndpoint(HostAndPort hiveMetastoreEndpoint) return this; } + public Builder setThriftMetastoreTimeout(Duration thriftMetastoreTimeout) + { + this.thriftMetastoreTimeout = requireNonNull(thriftMetastoreTimeout, "thriftMetastoreTimeout is null"); + return this; + } + public Builder setS3Endpoint(String s3Endpoint) { this.s3Endpoint = requireNonNull(s3Endpoint, "s3Endpoint is null"); @@ -135,7 +144,7 @@ public DistributedQueryRunner build() addHiveProperty("hive.s3.path-style-access", "true"); setMetastore(distributedQueryRunner -> new BridgingHiveMetastore( testingThriftHiveMetastoreBuilder() - .metastoreClient(hiveMetastoreEndpoint) + .metastoreClient(hiveMetastoreEndpoint, thriftMetastoreTimeout) .build())); setInitialSchemasLocationBase("s3a://" + bucketName); // cannot use s3:// as Hive metastore is not configured to accept it return super.build();