Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,18 @@
*/
package io.trino.plugin.hive;

import io.airlift.concurrent.MoreFutures;
import io.airlift.units.Duration;
import io.trino.plugin.hive.containers.HiveMinioDataLake;
import io.trino.plugin.hive.s3.S3HiveQueryRunner;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import org.testng.SkipException;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.Test;

import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.IntStream;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static java.lang.String.join;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;

public class TestHiveAnalyzeCorruptStatistics
extends AbstractTestQueryFramework
Expand All @@ -49,15 +39,13 @@ protected QueryRunner createQueryRunner()
hiveMinioDataLake.start();

return S3HiveQueryRunner.builder(hiveMinioDataLake)
// Increase timeout to allow operations on a table having many columns
// Increase timeout because drop_stats doesn't finish with in the default timeout
.setThriftMetastoreTimeout(new Duration(5, MINUTES))
.build();
}

// Repeat test with invocationCount for better test coverage, since the tested aspect is inherently non-deterministic.
@Test(invocationCount = 3)
@Test
public void testAnalyzeCorruptColumnStatisticsOnEmptyTable()
throws Exception
{
String tableName = "test_analyze_corrupt_column_statistics_" + randomTableSuffix();

Expand All @@ -79,45 +67,19 @@ public void testAnalyzeCorruptColumnStatisticsOnEmptyTable()
}

private void prepareBrokenColumnStatisticsTable(String tableName)
throws InterruptedException
{
int columnNumber = 1000;
List<String> columnNames = IntStream.rangeClosed(1, columnNumber).mapToObj(x -> "col_" + x + " int").collect(toImmutableList());
List<String> columnValues = IntStream.rangeClosed(1, columnNumber).mapToObj(String::valueOf).collect(toImmutableList());

assertUpdate("CREATE TABLE " + tableName + "(" + join(",", columnNames) + ")");
assertUpdate("INSERT INTO " + tableName + " VALUES (" + join(",", columnValues) + ")", 1);
assertUpdate("CALL system.drop_stats('tpch', '" + tableName + "')");

int threads = 10;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
try {
List<Future<Boolean>> futures = IntStream.range(0, threads)
.mapToObj(threadNumber -> executor.submit(() -> {
barrier.await(10, SECONDS);
try {
getQueryRunner().execute("ANALYZE " + tableName);
return true;
}
catch (Exception e) {
return false;
}
}))
.collect(toImmutableList());

long succeeded = futures.stream()
.map(MoreFutures::getFutureValue)
.filter(success -> success)
.count();
assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 col", 1);

// Insert duplicated row to simulate broken column statistics status https://github.com/trinodb/trino/issues/13787
assertEquals(onMetastore("SELECT COUNT(1) FROM TAB_COL_STATS WHERE db_name = 'tpch' AND table_name = '" + tableName + "'"), "1\n");
onMetastore("INSERT INTO TAB_COL_STATS " +
"SELECT cs_id + 1, db_name, table_name, column_name, column_type, tbl_id, long_low_value, long_high_value, double_high_value, double_low_value, big_decimal_low_value, big_decimal_high_value, num_nulls, num_distincts, avg_col_len, max_col_len, num_trues, num_falses, last_analyzed " +
"FROM TAB_COL_STATS WHERE db_name = 'tpch' AND table_name = '" + tableName + "'");
assertEquals(onMetastore("SELECT COUNT(1) FROM TAB_COL_STATS WHERE db_name = 'tpch' AND table_name = '" + tableName + "'"), "2\n");
}

if (succeeded < 2) {
throw new SkipException("Expect other invocations succeed");
}
}
finally {
executor.shutdownNow();
executor.awaitTermination(10, SECONDS);
}
private String onMetastore(@Language("SQL") String sql)
{
return hiveMinioDataLake.getHiveHadoop().runOnMetastore(sql);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public String runOnHive(String query)
return executeInContainerFailOnError("beeline", "-u", "jdbc:hive2://localhost:10000/default", "-n", "hive", "-e", query);
}

public String runOnMetastore(String query)
{
return executeInContainerFailOnError("mysql", "-D", "metastore", "-uroot", "-proot", "--batch", "--column-names=false", "-e", query);
}

public HostAndPort getHiveMetastoreEndpoint()
{
return getMappedHostAndPortForExposedPort(HIVE_METASTORE_PORT);
Expand Down