From 8a87c77694ce1697334f03b04b796eba72248334 Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Wed, 20 Jul 2022 12:17:47 -0700 Subject: [PATCH 1/3] Replace ResultWithQueryId with MaterializedResultWithQueryId Uses of ResultWithQueryId out side of some AbstractTestingTrinoClient always use MaterializedResult, and this change simplifies those usages. The new class will be used in QueryRunner to get results and query id for testing events. --- .../MaterializedResultWithQueryId.java | 40 +++++++++++++++++++ .../BaseDeltaLakeConnectorSmokeTest.java | 8 ++-- .../BaseDeltaLakeMinioConnectorTest.java | 6 +-- .../deltalake/TestDeltaLakeAnalyze.java | 5 +-- .../TestDeltaLakeDynamicFiltering.java | 7 ++-- .../deltalake/TestPredicatePushdown.java | 8 ++-- .../plugin/deltalake/TestSplitPruning.java | 5 +-- .../plugin/hive/BaseHiveConnectorTest.java | 8 ++-- .../hive/TestHiveDistributedJoinQueries.java | 5 +-- .../iceberg/BaseIcebergConnectorTest.java | 8 ++-- .../kafka/TestKafkaIntegrationPushDown.java | 15 ++++--- .../TestKuduIntegrationDynamicFilter.java | 5 +-- .../memory/TestMemoryConnectorTest.java | 9 ++--- .../BaseFailureRecoveryTest.java | 6 +-- .../testing/AbstractTestJoinQueries.java | 2 +- .../testing/AbstractTestQueryFramework.java | 2 +- .../io/trino/testing/BaseConnectorTest.java | 2 +- .../BaseDynamicPartitionPruningTest.java | 28 ++++++------- .../trino/testing/DistributedQueryRunner.java | 7 ++-- .../tests/TestLateMaterializationQueries.java | 6 +-- .../trino/tests/TestMinWorkerRequirement.java | 9 ++--- 21 files changed, 112 insertions(+), 79 deletions(-) create mode 100644 core/trino-main/src/main/java/io/trino/testing/MaterializedResultWithQueryId.java diff --git a/core/trino-main/src/main/java/io/trino/testing/MaterializedResultWithQueryId.java b/core/trino-main/src/main/java/io/trino/testing/MaterializedResultWithQueryId.java new file mode 100644 index 000000000000..f070ad10c613 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/testing/MaterializedResultWithQueryId.java @@ -0,0 +1,40 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.testing; + +import io.trino.spi.QueryId; + +import static java.util.Objects.requireNonNull; + +public class MaterializedResultWithQueryId +{ + private final QueryId queryId; + private final MaterializedResult result; + + public MaterializedResultWithQueryId(QueryId queryId, MaterializedResult result) + { + this.queryId = requireNonNull(queryId, "queryId is null"); + this.result = requireNonNull(result, "result is null"); + } + + public QueryId getQueryId() + { + return queryId; + } + + public MaterializedResult getResult() + { + return result; + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index 7384785872cb..f98b218ba982 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -30,8 +30,8 @@ import io.trino.testing.BaseConnectorSmokeTest; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.testing.TestingConnectorBehavior; import io.trino.tpch.TpchTable; import org.intellij.lang.annotations.Language; @@ -327,9 +327,9 @@ public void testInputDataSize() hiveTableName, getLocationForTable(bucketName, "foo"))); - ResultWithQueryId deltaResult = queryRunner.executeWithQueryId(broadcastJoinDistribution(true), "SELECT * FROM foo"); + MaterializedResultWithQueryId deltaResult = queryRunner.executeWithQueryId(broadcastJoinDistribution(true), "SELECT * FROM foo"); assertEquals(deltaResult.getResult().getRowCount(), 2); - ResultWithQueryId hiveResult = queryRunner.executeWithQueryId(broadcastJoinDistribution(true), format("SELECT * FROM %s.%s.%s", "hive", SCHEMA, hiveTableName)); + MaterializedResultWithQueryId hiveResult = queryRunner.executeWithQueryId(broadcastJoinDistribution(true), format("SELECT * FROM %s.%s.%s", "hive", SCHEMA, hiveTableName)); assertEquals(hiveResult.getResult().getRowCount(), 2); QueryManager queryManager = queryRunner.getCoordinator().getQueryManager(); @@ -1544,7 +1544,7 @@ private void invalidateMetadataCache(String tableName) private void testCountQuery(@Language("SQL") String sql, long expectedRowCount, long expectedSplitCount) { - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId(getSession(), sql); + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId(getSession(), sql); assertEquals(result.getResult().getOnlyColumnAsSet(), ImmutableSet.of(expectedRowCount)); verifySplitCount(result.getQueryId(), expectedSplitCount); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java index 2becfdf47756..2f3e8eff6e80 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java @@ -23,9 +23,9 @@ import io.trino.testing.BaseConnectorTest; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.MaterializedRow; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.TestTable; import io.trino.tpch.TpchTable; @@ -336,7 +336,7 @@ public void testTimestampPredicatePushdown(String value) assertUpdate("INSERT INTO " + tableName + " VALUES (TIMESTAMP '" + value + "')", 1); DistributedQueryRunner queryRunner = (DistributedQueryRunner) getQueryRunner(); - ResultWithQueryId queryResult = queryRunner.executeWithQueryId( + MaterializedResultWithQueryId queryResult = queryRunner.executeWithQueryId( getSession(), "SELECT * FROM " + tableName + " WHERE t < TIMESTAMP '" + value + "'"); assertEquals(getQueryInfo(queryRunner, queryResult).getQueryStats().getProcessedInputDataSize().toBytes(), 0); @@ -394,7 +394,7 @@ public void testAddColumnToPartitionedTable() } } - private QueryInfo getQueryInfo(DistributedQueryRunner queryRunner, ResultWithQueryId queryResult) + private QueryInfo getQueryInfo(DistributedQueryRunner queryRunner, MaterializedResultWithQueryId queryResult) { return queryRunner.getCoordinator().getQueryManager().getFullQueryInfo(queryResult.getQueryId()); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java index 0b919e51e839..6e564662659e 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java @@ -19,9 +19,8 @@ import io.trino.plugin.hive.containers.HiveMinioDataLake; import io.trino.spi.QueryId; import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.testing.sql.TestTable; import org.testng.annotations.Test; @@ -498,7 +497,7 @@ public void testDropStatsAccessControl() private void runAnalyzeVerifySplitCount(String tableName, long expectedSplitCount) { - ResultWithQueryId analyzeResult = getDistributedQueryRunner().executeWithQueryId(getSession(), "ANALYZE " + tableName); + MaterializedResultWithQueryId analyzeResult = getDistributedQueryRunner().executeWithQueryId(getSession(), "ANALYZE " + tableName); verifySplitCount(analyzeResult.getQueryId(), expectedSplitCount); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDynamicFiltering.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDynamicFiltering.java index 1bfebe45ddf0..91b279e5d424 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDynamicFiltering.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDynamicFiltering.java @@ -32,9 +32,8 @@ import io.trino.split.SplitSource; import io.trino.sql.planner.OptimizerConfig.JoinDistributionType; import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.transaction.TransactionId; import io.trino.transaction.TransactionManager; import org.testng.annotations.DataProvider; @@ -107,8 +106,8 @@ public Object[][] joinDistributionTypes() public void testDynamicFiltering(JoinDistributionType joinDistributionType) { String query = "SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.totalprice > 59995 AND orders.totalprice < 60000"; - ResultWithQueryId filteredResult = getDistributedQueryRunner().executeWithQueryId(sessionWithDynamicFiltering(true, joinDistributionType), query); - ResultWithQueryId unfilteredResult = getDistributedQueryRunner().executeWithQueryId(sessionWithDynamicFiltering(false, joinDistributionType), query); + MaterializedResultWithQueryId filteredResult = getDistributedQueryRunner().executeWithQueryId(sessionWithDynamicFiltering(true, joinDistributionType), query); + MaterializedResultWithQueryId unfilteredResult = getDistributedQueryRunner().executeWithQueryId(sessionWithDynamicFiltering(false, joinDistributionType), query); assertEqualsIgnoreOrder(filteredResult.getResult().getMaterializedRows(), unfilteredResult.getResult().getMaterializedRows()); QueryInputStats filteredStats = getQueryInputStats(filteredResult.getQueryId()); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestPredicatePushdown.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestPredicatePushdown.java index 544885d5c26f..ccb7cf5b8397 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestPredicatePushdown.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestPredicatePushdown.java @@ -19,9 +19,9 @@ import io.trino.spi.QueryId; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.MaterializedRow; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import org.testng.annotations.Test; import org.testng.asserts.SoftAssert; @@ -143,7 +143,7 @@ public void testUpdatePushdown() */ private void assertPushdown(String actual, String expected, long countProcessed) { - ResultWithQueryId result = executeWithQueryId(actual); + MaterializedResultWithQueryId result = executeWithQueryId(actual); Set actualRows = Set.copyOf(result.getResult().getMaterializedRows()); Set expectedRows = Set.copyOf( computeExpected(expected, result.getResult().getTypes()).getMaterializedRows()); @@ -176,7 +176,7 @@ private void assertPushdown(String actual, String expected, long countProcessed) */ private void assertPushdownUpdate(String sql, long count, long countProcessed) { - ResultWithQueryId result = executeWithQueryId(sql); + MaterializedResultWithQueryId result = executeWithQueryId(sql); OptionalLong actualCount = result.getResult().getUpdateCount(); SoftAssert softly = new SoftAssert(); @@ -189,7 +189,7 @@ private void assertPushdownUpdate(String sql, long count, long countProcessed) softly.assertAll(); } - private ResultWithQueryId executeWithQueryId(String sql) + private MaterializedResultWithQueryId executeWithQueryId(String sql) { return getDistributedQueryRunner().executeWithQueryId(getSession(), sql); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestSplitPruning.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestSplitPruning.java index 5c156e52f469..effb9059fd0c 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestSplitPruning.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestSplitPruning.java @@ -20,7 +20,6 @@ import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import org.intellij.lang.annotations.Language; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -135,10 +134,10 @@ public void testStatsPruningNaN(String type) Set.of(), 0); - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResult result = getDistributedQueryRunner().execute( getSession(), format("SELECT name FROM %s WHERE val IS NOT NULL", tableName)); - assertEquals(result.getResult().getOnlyColumnAsSet(), Set.of("a5", "b5", "a6", "b6")); + assertEquals(result.getOnlyColumnAsSet(), Set.of("a5", "b5", "a6", "b6")); } @Test diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index efa1dd1fc3d4..b16bf81e9063 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -53,9 +53,9 @@ import io.trino.testing.BaseConnectorTest; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.MaterializedRow; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.TestTable; import io.trino.testing.sql.TrinoSqlExecutor; @@ -4725,7 +4725,7 @@ private void doTestParquetTimestampPredicatePushdown(Session baseSession, HiveTi assertQuery(session, "SELECT * FROM " + tableName, format("VALUES (%s)", formatTimestamp(value))); DistributedQueryRunner queryRunner = (DistributedQueryRunner) getQueryRunner(); - ResultWithQueryId queryResult = queryRunner.executeWithQueryId( + MaterializedResultWithQueryId queryResult = queryRunner.executeWithQueryId( session, format("SELECT * FROM %s WHERE t < %s", tableName, formatTimestamp(value))); assertEquals(getQueryInfo(queryRunner, queryResult).getQueryStats().getProcessedInputDataSize().toBytes(), 0); @@ -4756,7 +4756,7 @@ public void testOrcTimestampPredicatePushdown(HiveTimestampPrecision timestampPr // to account for the fact that ORC stats are stored at millisecond precision and Trino rounds timestamps, // we filter by timestamps that differ from the actual value by at least 1ms, to observe pruning DistributedQueryRunner queryRunner = getDistributedQueryRunner(); - ResultWithQueryId queryResult = queryRunner.executeWithQueryId( + MaterializedResultWithQueryId queryResult = queryRunner.executeWithQueryId( session, format("SELECT * FROM test_orc_timestamp_predicate_pushdown WHERE t < %s", formatTimestamp(value.minusNanos(MILLISECONDS.toNanos(1))))); assertEquals(getQueryInfo(queryRunner, queryResult).getQueryStats().getProcessedInputDataSize().toBytes(), 0); @@ -4845,7 +4845,7 @@ private void assertNoDataRead(@Language("SQL") String sql) results -> assertThat(results.getRowCount()).isEqualTo(0)); } - private QueryInfo getQueryInfo(DistributedQueryRunner queryRunner, ResultWithQueryId queryResult) + private QueryInfo getQueryInfo(DistributedQueryRunner queryRunner, MaterializedResultWithQueryId queryResult) { return queryRunner.getCoordinator().getQueryManager().getFullQueryInfo(queryResult.getQueryId()); } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDistributedJoinQueries.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDistributedJoinQueries.java index d716490c909a..3ef04090548e 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDistributedJoinQueries.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDistributedJoinQueries.java @@ -18,9 +18,8 @@ import io.trino.metadata.QualifiedObjectName; import io.trino.operator.OperatorStats; import io.trino.testing.AbstractTestJoinQueries; -import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import org.testng.annotations.Test; import static com.google.common.base.Verify.verify; @@ -59,7 +58,7 @@ public void testJoinWithEmptyBuildSide() Session session = Session.builder(getSession()) .setSystemProperty(JOIN_DISTRIBUTION_TYPE, BROADCAST.name()) .build(); - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( session, "SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.totalprice = 123.4567"); assertEquals(result.getResult().getRowCount(), 0); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 02eb7529af43..7f7b4153812a 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -36,9 +36,9 @@ import io.trino.testing.BaseConnectorTest; import io.trino.testing.DataProviders; import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.MaterializedRow; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.TestTable; import io.trino.tpch.TpchTable; @@ -3277,15 +3277,15 @@ public void testSplitPruningFromDataFileStatistics(DataMappingTestSetup testSetu private void verifySplitCount(String query, int expectedSplitCount) { - ResultWithQueryId selectAllPartitionsResult = getDistributedQueryRunner().executeWithQueryId(getSession(), query); + MaterializedResultWithQueryId selectAllPartitionsResult = getDistributedQueryRunner().executeWithQueryId(getSession(), query); assertEqualsIgnoreOrder(selectAllPartitionsResult.getResult().getMaterializedRows(), computeActual(withoutPredicatePushdown(getSession()), query).getMaterializedRows()); verifySplitCount(selectAllPartitionsResult.getQueryId(), expectedSplitCount); } private void verifyPredicatePushdownDataRead(@Language("SQL") String query, boolean supportsPushdown) { - ResultWithQueryId resultWithPredicatePushdown = getDistributedQueryRunner().executeWithQueryId(getSession(), query); - ResultWithQueryId resultWithoutPredicatePushdown = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId resultWithPredicatePushdown = getDistributedQueryRunner().executeWithQueryId(getSession(), query); + MaterializedResultWithQueryId resultWithoutPredicatePushdown = getDistributedQueryRunner().executeWithQueryId( withoutPredicatePushdown(getSession()), query); diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaIntegrationPushDown.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaIntegrationPushDown.java index faeeea04fbb8..fcc2b5fa607a 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaIntegrationPushDown.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaIntegrationPushDown.java @@ -19,9 +19,8 @@ import io.trino.spi.connector.SchemaTableName; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.testing.kafka.TestingKafka; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -93,7 +92,7 @@ public void testPartitionPushDown() String sql = format("SELECT count(*) FROM default.%s WHERE _partition_id=1", topicNamePartition); assertEventually(() -> { - ResultWithQueryId queryResult = getDistributedQueryRunner().executeWithQueryId(getSession(), sql); + MaterializedResultWithQueryId queryResult = getDistributedQueryRunner().executeWithQueryId(getSession(), sql); assertEquals(getQueryInfo(getDistributedQueryRunner(), queryResult).getQueryStats().getProcessedInputPositions(), MESSAGE_NUM / 2); }); } @@ -111,7 +110,7 @@ private void assertProcessedInputPossitions(String sql, long expectedProcessedIn { DistributedQueryRunner queryRunner = getDistributedQueryRunner(); assertEventually(() -> { - ResultWithQueryId queryResult = queryRunner.executeWithQueryId(getSession(), sql); + MaterializedResultWithQueryId queryResult = queryRunner.executeWithQueryId(getSession(), sql); assertEquals(getQueryInfo(queryRunner, queryResult).getQueryStats().getProcessedInputPositions(), expectedProcessedInputPositions); }); } @@ -131,7 +130,7 @@ public void testTimestampCreateTimeModePushDown() // timestamp_upper_bound_force_push_down_enabled default as false. assertEventually(() -> { - ResultWithQueryId queryResult = queryRunner.executeWithQueryId(getSession(), sql); + MaterializedResultWithQueryId queryResult = queryRunner.executeWithQueryId(getSession(), sql); assertThat(getQueryInfo(queryRunner, queryResult).getQueryStats().getProcessedInputPositions()) .isEqualTo(998); }); @@ -143,7 +142,7 @@ public void testTimestampCreateTimeModePushDown() .setSystemProperty("kafka.timestamp_upper_bound_force_push_down_enabled", "true") .build(); - ResultWithQueryId queryResult = queryRunner.executeWithQueryId(sessionWithUpperBoundPushDownEnabled, sql); + MaterializedResultWithQueryId queryResult = queryRunner.executeWithQueryId(sessionWithUpperBoundPushDownEnabled, sql); assertThat(getQueryInfo(queryRunner, queryResult).getQueryStats().getProcessedInputPositions()) .isEqualTo(2); }); @@ -163,13 +162,13 @@ public void testTimestampLogAppendModePushDown() recordMessage.getEndTime()); assertEventually(() -> { - ResultWithQueryId queryResult = queryRunner.executeWithQueryId(getSession(), sql); + MaterializedResultWithQueryId queryResult = queryRunner.executeWithQueryId(getSession(), sql); assertThat(getQueryInfo(queryRunner, queryResult).getQueryStats().getProcessedInputPositions()) .isEqualTo(2); }); } - private static QueryInfo getQueryInfo(DistributedQueryRunner queryRunner, ResultWithQueryId queryResult) + private static QueryInfo getQueryInfo(DistributedQueryRunner queryRunner, MaterializedResultWithQueryId queryResult) { return queryRunner.getCoordinator().getQueryManager().getFullQueryInfo(queryResult.getQueryId()); } diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java index 386c995bcde6..31ce6614458d 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java @@ -30,9 +30,8 @@ import io.trino.split.SplitSource; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.tpch.TpchTable; import io.trino.transaction.TransactionId; import io.trino.transaction.TransactionManager; @@ -180,7 +179,7 @@ public void testJoinDynamicFilteringBlockProbeSide() private void assertDynamicFiltering(@Language("SQL") String selectQuery, Session session, int expectedRowCount, int... expectedOperatorRowsRead) { DistributedQueryRunner runner = getDistributedQueryRunner(); - ResultWithQueryId result = runner.executeWithQueryId(session, selectQuery); + MaterializedResultWithQueryId result = runner.executeWithQueryId(session, selectQuery); assertEquals(result.getResult().getRowCount(), expectedRowCount); assertEquals(getOperatorRowsRead(runner, result.getQueryId()), Ints.asList(expectedOperatorRowsRead)); diff --git a/plugin/trino-memory/src/test/java/io/trino/plugin/memory/TestMemoryConnectorTest.java b/plugin/trino-memory/src/test/java/io/trino/plugin/memory/TestMemoryConnectorTest.java index d94676bc3288..fe8e47d63b91 100644 --- a/plugin/trino-memory/src/test/java/io/trino/plugin/memory/TestMemoryConnectorTest.java +++ b/plugin/trino-memory/src/test/java/io/trino/plugin/memory/TestMemoryConnectorTest.java @@ -25,9 +25,8 @@ import io.trino.spi.metrics.Metrics; import io.trino.testing.BaseConnectorTest; import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.TestTable; import io.trino.testng.services.Flaky; @@ -187,7 +186,7 @@ public void testExplainCustomMetricsScanFilter() private Metrics collectCustomMetrics(String sql) { DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); - ResultWithQueryId result = runner.executeWithQueryId(getSession(), sql); + MaterializedResultWithQueryId result = runner.executeWithQueryId(getSession(), sql); return runner .getCoordinator() .getQueryManager() @@ -202,7 +201,7 @@ private Metrics collectCustomMetrics(String sql) @Test(timeOut = 30_000) public void testPhysicalInputPositions() { - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), "SELECT * FROM lineitem JOIN tpch.tiny.supplier ON lineitem.suppkey = supplier.suppkey " + "AND supplier.name = 'Supplier#000000001'"); @@ -455,7 +454,7 @@ public void testJoinDynamicFilteringMultiJoin(JoinDistributionType joinDistribut private void assertDynamicFiltering(@Language("SQL") String selectQuery, Session session, int expectedRowCount, int... expectedOperatorRowsRead) { - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId(session, selectQuery); + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId(session, selectQuery); assertEquals(result.getResult().getRowCount(), expectedRowCount); assertEquals(getOperatorRowsRead(getDistributedQueryRunner(), result.getQueryId()), Ints.asList(expectedOperatorRowsRead)); diff --git a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java index 7f82cb2dfce0..4e979a93c89f 100644 --- a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java +++ b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java @@ -29,8 +29,8 @@ import io.trino.spi.QueryId; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.tpch.TpchTable; import org.assertj.core.api.AbstractThrowableAssert; import org.intellij.lang.annotations.Language; @@ -563,7 +563,7 @@ private ExecutionResult execute(Session session, String query, Optional String tableName = "table_" + randomTableSuffix(); setup.ifPresent(sql -> getQueryRunner().execute(noRetries(session), resolveTableName(sql, tableName))); - ResultWithQueryId resultWithQueryId = null; + MaterializedResultWithQueryId resultWithQueryId = null; RuntimeException failure = null; try { resultWithQueryId = getDistributedQueryRunner().executeWithQueryId(withTraceToken(session, traceToken), resolveTableName(query, tableName)); @@ -722,7 +722,7 @@ private static class ExecutionResult private final Optional updatedTableStatistics; private ExecutionResult( - ResultWithQueryId resultWithQueryId, + MaterializedResultWithQueryId resultWithQueryId, Optional updatedTableContent, Optional updatedTableStatistics) { diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestJoinQueries.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestJoinQueries.java index 9c9583bbed62..541a53f820aa 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestJoinQueries.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestJoinQueries.java @@ -2349,7 +2349,7 @@ public void testOutputDuplicatesInsensitiveJoin() private void assertJoinOutputPositions(@Language("SQL") String sql, int expectedJoinOutputPositions) { - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( Session.builder(getSession()) .setSystemProperty(JOIN_REORDERING_STRATEGY, "NONE") .build(), diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java index b4b660b9a2d2..85da0e30216e 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java @@ -465,7 +465,7 @@ protected void assertQueryStats( Consumer resultAssertion) { DistributedQueryRunner queryRunner = getDistributedQueryRunner(); - ResultWithQueryId resultWithQueryId = queryRunner.executeWithQueryId(session, query); + MaterializedResultWithQueryId resultWithQueryId = queryRunner.executeWithQueryId(session, query); QueryStats queryStats = queryRunner.getCoordinator() .getQueryManager() .getFullQueryInfo(resultWithQueryId.getQueryId()) diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java index 4b4065f91c1f..6619dce8650a 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java @@ -3408,7 +3408,7 @@ public void testWrittenStats() String tableName = "test_written_stats_" + randomTableSuffix(); try { String sql = "CREATE TABLE " + tableName + " AS SELECT * FROM nation"; - ResultWithQueryId resultResultWithQueryId = getDistributedQueryRunner().executeWithQueryId(getSession(), sql); + MaterializedResultWithQueryId resultResultWithQueryId = getDistributedQueryRunner().executeWithQueryId(getSession(), sql); QueryInfo queryInfo = getDistributedQueryRunner().getCoordinator().getQueryManager().getFullQueryInfo(resultResultWithQueryId.getQueryId()); assertEquals(queryInfo.getQueryStats().getOutputPositions(), 1L); diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseDynamicPartitionPruningTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseDynamicPartitionPruningTest.java index 64eb3137cb12..f650072fccf2 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseDynamicPartitionPruningTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseDynamicPartitionPruningTest.java @@ -99,7 +99,7 @@ protected Session getSession() public void testJoinWithEmptyBuildSide() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem JOIN supplier ON partitioned_lineitem.suppkey = supplier.suppkey AND supplier.name = 'abc'"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -124,7 +124,7 @@ public void testJoinWithSelectiveBuildSide() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem JOIN supplier ON partitioned_lineitem.suppkey = supplier.suppkey " + "AND supplier.name = 'Supplier#000000001'"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -148,7 +148,7 @@ public void testJoinWithSelectiveBuildSide() public void testJoinWithNonSelectiveBuildSide() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem JOIN supplier ON partitioned_lineitem.suppkey = supplier.suppkey"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -173,7 +173,7 @@ public void testJoinWithNonSelectiveBuildSide() public void testJoinLargeBuildSideRangeDynamicFiltering() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem JOIN orders ON partitioned_lineitem.orderkey = orders.orderkey"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -204,7 +204,7 @@ public void testJoinWithMultipleDynamicFiltersOnProbe() "SELECT supplier.suppkey FROM " + "partitioned_lineitem JOIN tpch.tiny.supplier ON partitioned_lineitem.suppkey = supplier.suppkey AND supplier.name IN ('Supplier#000000001', 'Supplier#000000002')" + ") t JOIN supplier ON t.suppkey = supplier.suppkey AND supplier.suppkey IN (2, 3)"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -237,7 +237,7 @@ public void testJoinWithImplicitCoercion() "VALUES " + LINEITEM_COUNT); @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem_int l JOIN supplier s ON l.suppkey_int = s.suppkey AND s.name = 'Supplier#000000001'"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -260,7 +260,7 @@ public void testJoinWithImplicitCoercion() public void testSemiJoinWithEmptyBuildSide() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem WHERE suppkey IN (SELECT suppkey FROM supplier WHERE name = 'abc')"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -283,7 +283,7 @@ public void testSemiJoinWithEmptyBuildSide() public void testSemiJoinWithSelectiveBuildSide() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem WHERE suppkey IN (SELECT suppkey FROM supplier WHERE name = 'Supplier#000000001')"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -307,7 +307,7 @@ public void testSemiJoinWithSelectiveBuildSide() public void testSemiJoinWithNonSelectiveBuildSide() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem WHERE suppkey IN (SELECT suppkey FROM supplier)"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -332,7 +332,7 @@ public void testSemiJoinWithNonSelectiveBuildSide() public void testSemiJoinLargeBuildSideRangeDynamicFiltering() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem WHERE orderkey IN (SELECT orderkey FROM orders)"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -359,7 +359,7 @@ public void testSemiJoinLargeBuildSideRangeDynamicFiltering() public void testRightJoinWithEmptyBuildSide() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem l RIGHT JOIN supplier s ON l.suppkey = s.suppkey WHERE name = 'abc'"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -382,7 +382,7 @@ public void testRightJoinWithEmptyBuildSide() public void testRightJoinWithSelectiveBuildSide() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem l RIGHT JOIN supplier s ON l.suppkey = s.suppkey WHERE name = 'Supplier#000000001'"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -406,7 +406,7 @@ public void testRightJoinWithSelectiveBuildSide() public void testRightJoinWithNonSelectiveBuildSide() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem l RIGHT JOIN supplier s ON l.suppkey = s.suppkey"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -494,7 +494,7 @@ private void assertDynamicFilters(Session session, @Language("SQL") String query private long getQueryInputPositions(Session session, @Language("SQL") String sql, int expectedRowCount) { DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); - ResultWithQueryId result = runner.executeWithQueryId(session, sql); + MaterializedResultWithQueryId result = runner.executeWithQueryId(session, sql); assertThat(result.getResult().getRowCount()).isEqualTo(expectedRowCount); QueryId queryId = result.getQueryId(); QueryStats stats = runner.getCoordinator().getQueryManager().getFullQueryInfo(queryId).getQueryStats(); diff --git a/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java b/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java index 8fb63a79cc42..3751180cc0de 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java @@ -536,11 +536,12 @@ public MaterializedResult execute(Session session, @Language("SQL") String sql) } } - public ResultWithQueryId executeWithQueryId(Session session, @Language("SQL") String sql) + public MaterializedResultWithQueryId executeWithQueryId(Session session, @Language("SQL") String sql) { lock.readLock().lock(); try { - return trinoClient.execute(session, sql); + ResultWithQueryId result = trinoClient.execute(session, sql); + return new MaterializedResultWithQueryId(result.getQueryId(), result.getResult()); } finally { lock.readLock().unlock(); @@ -550,7 +551,7 @@ public ResultWithQueryId executeWithQueryId(Session session, @Override public MaterializedResultWithPlan executeWithPlan(Session session, String sql, WarningCollector warningCollector) { - ResultWithQueryId resultWithQueryId = executeWithQueryId(session, sql); + MaterializedResultWithQueryId resultWithQueryId = executeWithQueryId(session, sql); return new MaterializedResultWithPlan(resultWithQueryId.getResult().toTestTypes(), getQueryPlan(resultWithQueryId.getQueryId())); } diff --git a/testing/trino-tests/src/test/java/io/trino/tests/TestLateMaterializationQueries.java b/testing/trino-tests/src/test/java/io/trino/tests/TestLateMaterializationQueries.java index effabc67fa5a..e97f3025e861 100644 --- a/testing/trino-tests/src/test/java/io/trino/tests/TestLateMaterializationQueries.java +++ b/testing/trino-tests/src/test/java/io/trino/tests/TestLateMaterializationQueries.java @@ -18,8 +18,8 @@ import io.trino.execution.QueryManager; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.tests.tpch.TpchQueryRunnerBuilder; import org.intellij.lang.annotations.Language; import org.testng.annotations.Test; @@ -87,10 +87,10 @@ private void assertLazyQuery(@Language("SQL") String sql) { QueryManager queryManager = getDistributedQueryRunner().getCoordinator().getQueryManager(); - ResultWithQueryId workProcessorResultResultWithQueryId = getDistributedQueryRunner().executeWithQueryId(lateMaterialization(), sql); + MaterializedResultWithQueryId workProcessorResultResultWithQueryId = getDistributedQueryRunner().executeWithQueryId(lateMaterialization(), sql); QueryInfo workProcessorQueryInfo = queryManager.getFullQueryInfo(workProcessorResultResultWithQueryId.getQueryId()); - ResultWithQueryId noWorkProcessorResultResultWithQueryId = getDistributedQueryRunner().executeWithQueryId(noLateMaterialization(), sql); + MaterializedResultWithQueryId noWorkProcessorResultResultWithQueryId = getDistributedQueryRunner().executeWithQueryId(noLateMaterialization(), sql); QueryInfo noWorkProcessorQueryInfo = queryManager.getFullQueryInfo(noWorkProcessorResultResultWithQueryId.getQueryId()); // ensure results are correct diff --git a/testing/trino-tests/src/test/java/io/trino/tests/TestMinWorkerRequirement.java b/testing/trino-tests/src/test/java/io/trino/tests/TestMinWorkerRequirement.java index 956050942e7c..03ab4762263c 100644 --- a/testing/trino-tests/src/test/java/io/trino/tests/TestMinWorkerRequirement.java +++ b/testing/trino-tests/src/test/java/io/trino/tests/TestMinWorkerRequirement.java @@ -21,8 +21,7 @@ import io.trino.execution.QueryInfo; import io.trino.execution.QueryManager; import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.MaterializedResult; -import io.trino.testing.ResultWithQueryId; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.tests.tpch.TpchQueryRunnerBuilder; import org.testng.annotations.Test; @@ -186,17 +185,17 @@ public void testMultipleRequiredWorkerNodesSessionOverride() .setCatalog("tpch") .setSchema("tiny") .build(); - ListenableFuture> queryFuture1 = service.submit(() -> queryRunner.executeWithQueryId(session1, "SELECT COUNT(*) from lineitem")); + ListenableFuture queryFuture1 = service.submit(() -> queryRunner.executeWithQueryId(session1, "SELECT COUNT(*) from lineitem")); Session session2 = Session.builder(session1) .setSystemProperty(REQUIRED_WORKERS_COUNT, "3") .build(); - ListenableFuture> queryFuture2 = service.submit(() -> queryRunner.executeWithQueryId(session2, "SELECT COUNT(*) from lineitem")); + ListenableFuture queryFuture2 = service.submit(() -> queryRunner.executeWithQueryId(session2, "SELECT COUNT(*) from lineitem")); Session session3 = Session.builder(session1) .setSystemProperty(REQUIRED_WORKERS_COUNT, "4") .build(); - ListenableFuture> queryFuture3 = service.submit(() -> queryRunner.executeWithQueryId(session3, "SELECT COUNT(*) from lineitem")); + ListenableFuture queryFuture3 = service.submit(() -> queryRunner.executeWithQueryId(session3, "SELECT COUNT(*) from lineitem")); MILLISECONDS.sleep(1000); // None of the queries should run From 9f1a600c92d39d47f4e2a61e5b383ee486ff63ef Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Thu, 21 Jul 2022 10:52:36 -0700 Subject: [PATCH 2/3] Add QueryFailedException to capture query id from failed test queries When testing it can be important to know the query id when a query fails. --- .../trino/testing/QueryFailedException.java | 39 +++++++++++++++++++ .../testing/AbstractTestingTrinoClient.java | 19 ++++----- 2 files changed, 49 insertions(+), 9 deletions(-) create mode 100644 core/trino-main/src/main/java/io/trino/testing/QueryFailedException.java diff --git a/core/trino-main/src/main/java/io/trino/testing/QueryFailedException.java b/core/trino-main/src/main/java/io/trino/testing/QueryFailedException.java new file mode 100644 index 000000000000..6ba9b4396736 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/testing/QueryFailedException.java @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.testing; + +import io.trino.spi.QueryId; + +public class QueryFailedException + extends RuntimeException +{ + private final QueryId queryId; + + public QueryFailedException(QueryId queryId, String message) + { + super(message); + this.queryId = queryId; + } + + public QueryFailedException(QueryId queryId, String message, Throwable cause) + { + super(message, cause); + this.queryId = queryId; + } + + public QueryId getQueryId() + { + return queryId; + } +} diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java index ea845f4d17ee..29f744740f8a 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java @@ -19,7 +19,6 @@ import io.trino.client.ClientSelectedRole; import io.trino.client.ClientSession; import io.trino.client.Column; -import io.trino.client.QueryError; import io.trino.client.QueryStatusInfo; import io.trino.client.StatementClient; import io.trino.metadata.MetadataUtil; @@ -82,11 +81,13 @@ public void close() protected abstract ResultsSession getResultSession(Session session); public ResultWithQueryId execute(@Language("SQL") String sql) + throws QueryFailedException { return execute(defaultSession, sql); } public ResultWithQueryId execute(Session session, @Language("SQL") String sql) + throws QueryFailedException { ResultsSession resultsSession = getResultSession(session); @@ -99,10 +100,10 @@ public ResultWithQueryId execute(Session session, @Language("SQL") String sql } checkState(client.isFinished()); - QueryError error = client.finalStatusInfo().getError(); + QueryStatusInfo results = client.finalStatusInfo(); + QueryId queryId = new QueryId(results.getId()); - if (error == null) { - QueryStatusInfo results = client.finalStatusInfo(); + if (results.getError() == null) { if (results.getUpdateType() != null) { resultsSession.setUpdateType(results.getUpdateType()); } @@ -114,14 +115,14 @@ public ResultWithQueryId execute(Session session, @Language("SQL") String sql resultsSession.setStatementStats(results.getStats()); T result = resultsSession.build(client.getSetSessionProperties(), client.getResetSessionProperties()); - return new ResultWithQueryId<>(new QueryId(results.getId()), result); + return new ResultWithQueryId<>(queryId, result); } - if (error.getFailureInfo() != null) { - RuntimeException remoteException = error.getFailureInfo().toException(); - throw new RuntimeException(Optional.ofNullable(remoteException.getMessage()).orElseGet(remoteException::toString), remoteException); + if (results.getError().getFailureInfo() != null) { + RuntimeException remoteException = results.getError().getFailureInfo().toException(); + throw new QueryFailedException(queryId, Optional.ofNullable(remoteException.getMessage()).orElseGet(remoteException::toString), remoteException); } - throw new RuntimeException("Query failed: " + error.getMessage()); + throw new QueryFailedException(queryId, "Query failed: " + results.getError().getMessage()); // dump query info to console for debugging (NOTE: not pretty printed) // JsonCodec queryInfoJsonCodec = createCodecFactory().prettyPrint().jsonCodec(QueryInfo.class); From 92533f92ea4df1902c010c47e5bdaec5c09efab8 Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Thu, 21 Jul 2022 10:59:57 -0700 Subject: [PATCH 3/3] Rewrite EventsCollector to have separate event collection per query The old design was more complex because all events were collected together and this caused complex threading issues. --- .../execution/EventsAwaitingQueries.java | 64 +++-- .../io/trino/execution/EventsCollector.java | 219 +++++++++--------- .../execution/TestCompletedEventWarnings.java | 15 +- .../execution/TestConnectorEventListener.java | 29 +-- .../execution/TestEventListenerBasic.java | 165 ++++++------- .../TestEventListenerWithSplits.java | 35 ++- 6 files changed, 261 insertions(+), 266 deletions(-) diff --git a/testing/trino-tests/src/test/java/io/trino/execution/EventsAwaitingQueries.java b/testing/trino-tests/src/test/java/io/trino/execution/EventsAwaitingQueries.java index 68a0404c628c..24ca576a6f93 100644 --- a/testing/trino-tests/src/test/java/io/trino/execution/EventsAwaitingQueries.java +++ b/testing/trino-tests/src/test/java/io/trino/execution/EventsAwaitingQueries.java @@ -13,46 +13,51 @@ */ package io.trino.execution; +import io.airlift.units.Duration; import io.trino.Session; +import io.trino.execution.EventsCollector.QueryEvents; +import io.trino.spi.QueryId; +import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResult; -import io.trino.testing.QueryRunner; +import io.trino.testing.MaterializedResultWithQueryId; +import io.trino.testing.QueryFailedException; import org.intellij.lang.annotations.Language; -import java.time.Duration; import java.util.Optional; import static com.google.common.base.Strings.nullToEmpty; import static java.lang.String.format; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.testng.Assert.fail; class EventsAwaitingQueries { private final EventsCollector eventsCollector; - private final QueryRunner queryRunner; - private final Duration extraWaitTime; + private final DistributedQueryRunner queryRunner; - EventsAwaitingQueries(EventsCollector eventsCollector, QueryRunner queryRunner, Duration extraWaitTime) + EventsAwaitingQueries(EventsCollector eventsCollector, DistributedQueryRunner queryRunner) { this.eventsCollector = requireNonNull(eventsCollector, "eventsCollector is null"); this.queryRunner = requireNonNull(queryRunner, "queryRunner is null"); - this.extraWaitTime = extraWaitTime; } - MaterializedResult runQueryAndWaitForEvents(@Language("SQL") String sql, int numEventsExpected, Session session) + MaterializedResultWithEvents runQueryAndWaitForEvents(@Language("SQL") String sql, Session session) throws Exception { - return runQueryAndWaitForEvents(sql, numEventsExpected, session, Optional.empty()); + return runQueryAndWaitForEvents(sql, session, Optional.empty()); } - MaterializedResult runQueryAndWaitForEvents(@Language("SQL") String sql, int numEventsExpected, Session session, Optional expectedExceptionRegEx) + MaterializedResultWithEvents runQueryAndWaitForEvents(@Language("SQL") String sql, Session session, Optional expectedExceptionRegEx) throws Exception { - eventsCollector.reset(numEventsExpected); + QueryId queryId = null; MaterializedResult result = null; try { - result = queryRunner.execute(session, sql); + MaterializedResultWithQueryId materializedResultWithQueryId = queryRunner.executeWithQueryId(session, sql); + queryId = materializedResultWithQueryId.getQueryId(); + result = materializedResultWithQueryId.getResult(); } catch (RuntimeException exception) { if (expectedExceptionRegEx.isPresent()) { @@ -64,12 +69,43 @@ MaterializedResult runQueryAndWaitForEvents(@Language("SQL") String sql, int num else { throw exception; } + if (exception instanceof QueryFailedException) { + queryId = ((QueryFailedException) exception).getQueryId(); + } + } + if (queryId == null) { + return null; } - eventsCollector.waitForEvents(10); + QueryEvents queryEvents = eventsCollector.getQueryEvents(queryId); + queryEvents.waitForQueryCompletion(new Duration(3, SECONDS)); + // Sleep some more so extraneous, unexpected events can be recorded too. // This is not rock solid but improves effectiveness on detecting duplicate events. - Thread.sleep(extraWaitTime.toMillis()); - return result; + SECONDS.sleep(1); + + return new MaterializedResultWithEvents(result, queryEvents); + } + + public static class MaterializedResultWithEvents + { + private final MaterializedResult materializedResult; + private final QueryEvents queryEvents; + + public MaterializedResultWithEvents(MaterializedResult materializedResult, QueryEvents queryEvents) + { + this.materializedResult = materializedResult; + this.queryEvents = requireNonNull(queryEvents, "queryEvents is null"); + } + + public MaterializedResult getMaterializedResult() + { + return materializedResult; + } + + public QueryEvents getQueryEvents() + { + return queryEvents; + } } } diff --git a/testing/trino-tests/src/test/java/io/trino/execution/EventsCollector.java b/testing/trino-tests/src/test/java/io/trino/execution/EventsCollector.java index d5fab70e9461..98c7ba9d36ec 100644 --- a/testing/trino-tests/src/test/java/io/trino/execution/EventsCollector.java +++ b/testing/trino-tests/src/test/java/io/trino/execution/EventsCollector.java @@ -14,6 +14,8 @@ package io.trino.execution; import com.google.common.collect.ImmutableList; +import io.airlift.units.Duration; +import io.trino.spi.QueryId; import io.trino.spi.eventlistener.QueryCompletedEvent; import io.trino.spi.eventlistener.QueryCreatedEvent; import io.trino.spi.eventlistener.SplitCompletedEvent; @@ -21,164 +23,163 @@ import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; +import java.util.concurrent.TimeoutException; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; @ThreadSafe final class EventsCollector { - private final EventFilters eventFilters; - @GuardedBy("this") - private ImmutableList.Builder queryCreatedEvents; - @GuardedBy("this") - private ImmutableList.Builder queryCompletedEvents; - @GuardedBy("this") - private ImmutableList.Builder splitCompletedEvents; - @GuardedBy("this") - private CountDownLatch eventsLatch; - - public EventsCollector() - { - this(EventFilters.builder().build()); - } - - public EventsCollector(EventFilters eventFilters) - { - this.eventFilters = requireNonNull(eventFilters, "eventFilters is null"); - reset(0); - } - - public synchronized void reset(int numEvents) - { - queryCreatedEvents = ImmutableList.builder(); - queryCompletedEvents = ImmutableList.builder(); - splitCompletedEvents = ImmutableList.builder(); - - eventsLatch = new CountDownLatch(numEvents); - } - - public void waitForEvents(int timeoutSeconds) - throws InterruptedException - { - CountDownLatch eventsLatch; - synchronized (this) { - // since the eventsLatch is replaced in the reset method, a lock is required for proper memory visibility - eventsLatch = this.eventsLatch; - } - eventsLatch.await(timeoutSeconds, TimeUnit.SECONDS); - } + private final ConcurrentHashMap queryEvents = new ConcurrentHashMap<>(); public synchronized void addQueryCreated(QueryCreatedEvent event) { - if (!eventFilters.getQueryCreatedFilter().test(event)) { - return; - } - queryCreatedEvents.add(event); - eventsLatch.countDown(); + getQueryEvents(new QueryId(event.getMetadata().getQueryId())).addQueryCreated(event); } public synchronized void addQueryCompleted(QueryCompletedEvent event) { - if (!eventFilters.getQueryCompletedFilter().test(event)) { - return; - } - queryCompletedEvents.add(event); - eventsLatch.countDown(); + getQueryEvents(new QueryId(event.getMetadata().getQueryId())).addQueryCompleted(event); } public synchronized void addSplitCompleted(SplitCompletedEvent event) { - if (!eventFilters.getSplitCompletedFilter().test(event)) { - return; - } - splitCompletedEvents.add(event); - eventsLatch.countDown(); + getQueryEvents(new QueryId(event.getQueryId())).addSplitCompleted(event); } - public synchronized List getQueryCreatedEvents() + public QueryEvents getQueryEvents(QueryId queryId) { - return queryCreatedEvents.build(); - } - - public synchronized List getQueryCompletedEvents() - { - return queryCompletedEvents.build(); - } - - public synchronized List getSplitCompletedEvents() - { - return splitCompletedEvents.build(); + return queryEvents.computeIfAbsent(queryId, ignored -> new QueryEvents()); } @ThreadSafe - public static class EventFilters + public static class QueryEvents { - private final Predicate queryCreatedFilter; - private final Predicate queryCompletedFilter; - private final Predicate splitCompletedFilter; - - private EventFilters( - Predicate queryCreatedFilter, - Predicate queryCompletedFilter, - Predicate splitCompletedFilter) + @GuardedBy("this") + private QueryCreatedEvent queryCreatedEvent; + @GuardedBy("this") + private QueryCompletedEvent queryCompletedEvent; + @GuardedBy("this") + private final CountDownLatch queryCompleteLatch = new CountDownLatch(1); + + @GuardedBy("this") + private final List splitCompletedEvents = new ArrayList<>(); + @GuardedBy("this") + private CountDownLatch splitEventLatch; + + @GuardedBy("this") + private final List failures = new ArrayList<>(); + + public synchronized QueryCreatedEvent getQueryCreatedEvent() { - this.queryCreatedFilter = requireNonNull(queryCreatedFilter, "queryCreatedFilter is null"); - this.queryCompletedFilter = requireNonNull(queryCompletedFilter, "queryCompletedFilter is null"); - this.splitCompletedFilter = requireNonNull(splitCompletedFilter, "splitCompletedFilter is null"); + checkFailure(); + if (queryCreatedEvent == null) { + throw new IllegalStateException("QueryCreatedEvent has not been set"); + } + return queryCreatedEvent; } - Predicate getQueryCreatedFilter() + public synchronized QueryCompletedEvent getQueryCompletedEvent() { - return queryCreatedFilter; + checkFailure(); + if (queryCompletedEvent == null) { + throw new IllegalStateException("QueryCompletedEvent has not been set"); + } + return queryCompletedEvent; } - Predicate getQueryCompletedFilter() + private synchronized void addQueryCreated(QueryCreatedEvent event) { - return queryCompletedFilter; + requireNonNull(event, "event is null"); + if (queryCreatedEvent != null) { + failures.add(new RuntimeException("QueryCreateEvent already set")); + return; + } + queryCreatedEvent = event; + + if (queryCompletedEvent != null) { + queryCompleteLatch.countDown(); + } } - Predicate getSplitCompletedFilter() + private synchronized void addQueryCompleted(QueryCompletedEvent event) { - return splitCompletedFilter; + requireNonNull(event, "event is null"); + if (queryCompletedEvent != null) { + failures.add(new RuntimeException("QueryCompletedEvent already set")); + return; + } + queryCompletedEvent = event; + + if (queryCreatedEvent != null) { + queryCompleteLatch.countDown(); + } } - public static Builder builder() + private synchronized void addSplitCompleted(SplitCompletedEvent event) { - return new Builder(); + splitCompletedEvents.add(event); + if (splitEventLatch != null) { + splitEventLatch.countDown(); + } } - public static class Builder + public void waitForQueryCompletion(Duration timeout) + throws InterruptedException, TimeoutException { - private Predicate queryCreatedFilter = queryCreatedEvent -> true; - private Predicate queryCompletedFilter = queryCompletedEvent -> true; - private Predicate splitCompletedFilter = splitCompletedEvent -> true; - - public Builder setQueryCreatedFilter(Predicate queryCreatedFilter) - { - this.queryCreatedFilter = queryCreatedFilter; - return this; + CountDownLatch latch; + synchronized (this) { + latch = queryCompleteLatch; } - public Builder setQueryCompletedFilter(Predicate queryCompletedFilter) - { - this.queryCompletedFilter = queryCompletedFilter; - return this; + boolean finished = latch.await(timeout.toMillis(), MILLISECONDS); + if (!finished) { + throw new TimeoutException("Query did not complete in " + timeout); + } + } + + public synchronized List waitForSplitCompletedEvents(int numberOfSplitEvents, Duration timeout) + throws InterruptedException, TimeoutException + { + CountDownLatch latch; + synchronized (this) { + checkFailure(); + + if (splitCompletedEvents.size() >= numberOfSplitEvents) { + return ImmutableList.copyOf(splitCompletedEvents); + } + + if (splitEventLatch != null) { + // support for waiting multiple times is complex and currently not needed, so it has not been implemented + throw new IllegalStateException("Wait for split completion already triggered for this query"); + } + splitEventLatch = new CountDownLatch(numberOfSplitEvents - splitCompletedEvents.size()); + latch = splitEventLatch; } - public Builder setSplitCompletedFilter(Predicate splitCompletedFilter) - { - this.splitCompletedFilter = splitCompletedFilter; - return this; + boolean finished = latch.await(timeout.toMillis(), MILLISECONDS); + if (!finished) { + throw new TimeoutException("Split events did not complete in " + timeout); } - public EventFilters build() - { - return new EventFilters(queryCreatedFilter, queryCompletedFilter, splitCompletedFilter); + synchronized (this) { + checkFailure(); + return ImmutableList.copyOf(splitCompletedEvents); + } + } + + private synchronized void checkFailure() + { + if (failures.isEmpty()) { + return; } + RuntimeException exception = new RuntimeException("Event collection failed"); + failures.forEach(exception::addSuppressed); } } } diff --git a/testing/trino-tests/src/test/java/io/trino/execution/TestCompletedEventWarnings.java b/testing/trino-tests/src/test/java/io/trino/execution/TestCompletedEventWarnings.java index 1c95d338275b..3c6e4e2e5f6e 100644 --- a/testing/trino-tests/src/test/java/io/trino/execution/TestCompletedEventWarnings.java +++ b/testing/trino-tests/src/test/java/io/trino/execution/TestCompletedEventWarnings.java @@ -15,7 +15,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.Closer; -import io.trino.execution.EventsCollector.EventFilters; +import io.trino.execution.EventsCollector.QueryEvents; import io.trino.execution.TestEventListenerPlugin.TestingEventListenerPlugin; import io.trino.execution.warnings.WarningCollectorConfig; import io.trino.spi.TrinoWarning; @@ -29,13 +29,11 @@ import org.testng.annotations.Test; import java.io.IOException; -import java.time.Duration; import java.util.List; import java.util.Set; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; -import static com.google.common.collect.Iterables.getOnlyElement; import static io.trino.SessionTestUtils.TEST_SESSION; import static org.testng.Assert.fail; @@ -44,10 +42,7 @@ public class TestCompletedEventWarnings { private static final int TEST_WARNINGS = 5; - private final EventsCollector generatedEvents = new EventsCollector(EventFilters.builder() - .setQueryCreatedFilter(event -> false) - .setSplitCompletedFilter(event -> false) - .build()); + private final EventsCollector generatedEvents = new EventsCollector(); private Closer closer; private EventsAwaitingQueries queries; @@ -63,7 +58,7 @@ public void setUp() .build(); closer.register(queryRunner); queryRunner.installPlugin(new TestingEventListenerPlugin(generatedEvents)); - queries = new EventsAwaitingQueries(generatedEvents, queryRunner, Duration.ofSeconds(1)); + queries = new EventsAwaitingQueries(generatedEvents, queryRunner); } @AfterClass(alwaysRun = true) @@ -92,9 +87,9 @@ public void testCompletedEventWarnings() private void assertWarnings(@Language("SQL") String sql, List expectedWarnings) throws Exception { - queries.runQueryAndWaitForEvents(sql, 1, TEST_SESSION); + QueryEvents queryEvents = queries.runQueryAndWaitForEvents(sql, TEST_SESSION).getQueryEvents(); - Set warnings = getOnlyElement(generatedEvents.getQueryCompletedEvents()) + Set warnings = queryEvents.getQueryCompletedEvent() .getWarnings() .stream() .map(TrinoWarning::getWarningCode) diff --git a/testing/trino-tests/src/test/java/io/trino/execution/TestConnectorEventListener.java b/testing/trino-tests/src/test/java/io/trino/execution/TestConnectorEventListener.java index 76cae6e6d531..3dfcc0de154d 100644 --- a/testing/trino-tests/src/test/java/io/trino/execution/TestConnectorEventListener.java +++ b/testing/trino-tests/src/test/java/io/trino/execution/TestConnectorEventListener.java @@ -16,30 +16,21 @@ import com.google.common.collect.ImmutableList; import com.google.common.io.Closer; import io.trino.connector.MockConnectorFactory; -import io.trino.execution.EventsCollector.EventFilters; import io.trino.spi.Plugin; import io.trino.spi.connector.ConnectorFactory; -import io.trino.spi.eventlistener.QueryCompletedEvent; -import io.trino.spi.eventlistener.QueryCreatedEvent; import io.trino.testing.DistributedQueryRunner; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.io.IOException; -import java.time.Duration; -import java.util.List; -import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.SessionTestUtils.TEST_SESSION; -import static org.assertj.core.api.Assertions.assertThat; @Test(singleThreaded = true) public class TestConnectorEventListener { - private final EventsCollector generatedEvents = new EventsCollector(EventFilters.builder() - .setSplitCompletedFilter(event -> false) - .build()); + private final EventsCollector generatedEvents = new EventsCollector(); private Closer closer; private EventsAwaitingQueries queries; @@ -62,7 +53,7 @@ public Iterable getConnectorFactories() }); closer.register(queryRunner); queryRunner.createCatalog("mock-catalog", "mock"); - queries = new EventsAwaitingQueries(generatedEvents, queryRunner, Duration.ofSeconds(1)); + queries = new EventsAwaitingQueries(generatedEvents, queryRunner); } @AfterClass(alwaysRun = true) @@ -79,20 +70,6 @@ public void tearDown() public void testConnectorEventHandlerReceivingEvents() throws Exception { - queries.runQueryAndWaitForEvents("SELECT 1", 2, TEST_SESSION); - - List queryCreatedEvents = generatedEvents.getQueryCreatedEvents(); - List queryCompletedEvents = generatedEvents.getQueryCompletedEvents(); - List allEvents = ImmutableList.builder() - .addAll(queryCreatedEvents) - .addAll(queryCompletedEvents) - .build(); - List eventTypes = allEvents.stream().map(event -> event.getClass().getSimpleName()).sorted().collect(toImmutableList()); - assertThat(allEvents) - .size().withFailMessage(() -> "got events: " + eventTypes).isEqualTo(2); - assertThat(queryCreatedEvents) - .size().withFailMessage(() -> "got events: " + eventTypes).isEqualTo(1); - assertThat(queryCompletedEvents) - .size().withFailMessage(() -> "got events: " + eventTypes).isEqualTo(1); + queries.runQueryAndWaitForEvents("SELECT 1", TEST_SESSION).getQueryEvents(); } } diff --git a/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java b/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java index d0262c935fe2..b6186425def6 100644 --- a/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java +++ b/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java @@ -19,7 +19,8 @@ import io.trino.Session; import io.trino.connector.MockConnectorFactory; import io.trino.connector.MockConnectorTableHandle; -import io.trino.execution.EventsCollector.EventFilters; +import io.trino.execution.EventsAwaitingQueries.MaterializedResultWithEvents; +import io.trino.execution.EventsCollector.QueryEvents; import io.trino.execution.TestEventListenerPlugin.TestingEventListenerPlugin; import io.trino.plugin.base.metrics.LongCount; import io.trino.plugin.resourcegroups.ResourceGroupManagerPlugin; @@ -46,7 +47,6 @@ import io.trino.spi.security.ViewExpression; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; import org.intellij.lang.annotations.Language; import org.testng.annotations.DataProvider; @@ -90,7 +90,6 @@ public class TestEventListenerBasic private static final String BIGINT_TYPE = BIGINT.getDisplayName(); private static final Metrics TEST_METRICS = new Metrics(ImmutableMap.of("test_metrics", new LongCount(1))); - private final EventsCollector generatedEvents = new EventsCollector(buildEventFilters()); private EventsAwaitingQueries queries; @Override @@ -104,6 +103,8 @@ protected QueryRunner createQueryRunner() .setClientInfo("{\"clientVersion\":\"testVersion\"}") .build(); + EventsCollector generatedEvents = new EventsCollector(); + DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session).setNodeCount(1).build(); queryRunner.installPlugin(new TpchPlugin()); queryRunner.installPlugin(new TestingEventListenerPlugin(generatedEvents)); @@ -195,20 +196,11 @@ public Iterable getConnectorFactories() queryRunner.getCoordinator().getResourceGroupManager().get() .setConfigurationManager("file", ImmutableMap.of("resource-groups.config-file", getResourceFilePath("resource_groups_config_simple.json"))); - queries = new EventsAwaitingQueries(generatedEvents, queryRunner, Duration.ofSeconds(1)); + queries = new EventsAwaitingQueries(generatedEvents, queryRunner); return queryRunner; } - private static EventFilters buildEventFilters() - { - return EventFilters.builder() - .setQueryCreatedFilter(event -> !event.getMetadata().getQuery().contains(IGNORE_EVENT_MARKER)) - .setQueryCompletedFilter(event -> !event.getMetadata().getQuery().contains(IGNORE_EVENT_MARKER)) - .setSplitCompletedFilter(event -> false) - .build(); - } - private String getResourceFilePath(String fileName) { try { @@ -219,10 +211,10 @@ private String getResourceFilePath(String fileName) } } - private MaterializedResult runQueryAndWaitForEvents(@Language("SQL") String sql, int numEventsExpected) + private MaterializedResultWithEvents runQueryAndWaitForEvents(@Language("SQL") String sql) throws Exception { - return queries.runQueryAndWaitForEvents(sql, numEventsExpected, getSession()); + return queries.runQueryAndWaitForEvents(sql, getSession()); } @Test @@ -320,9 +312,9 @@ private void assertFailedQuery(@Language("SQL") String sql, String expectedFailu private void assertFailedQuery(Session session, @Language("SQL") String sql, String expectedFailure) throws Exception { - queries.runQueryAndWaitForEvents(sql, 2, session, Optional.of(expectedFailure)); + QueryEvents queryEvents = queries.runQueryAndWaitForEvents(sql, session, Optional.of(expectedFailure)).getQueryEvents(); - QueryCompletedEvent queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent queryCompletedEvent = queryEvents.getQueryCompletedEvent(); assertEquals(queryCompletedEvent.getMetadata().getQuery(), sql); QueryFailureInfo failureInfo = queryCompletedEvent.getFailureInfo() @@ -334,9 +326,9 @@ private void assertFailedQuery(Session session, @Language("SQL") String sql, Str public void testReferencedTablesAndRoutines() throws Exception { - runQueryAndWaitForEvents("SELECT sum(linenumber) FROM lineitem", 2); + QueryEvents queryEvents = runQueryAndWaitForEvents("SELECT sum(linenumber) FROM lineitem").getQueryEvents(); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); List tables = event.getMetadata().getTables(); assertEquals(tables.size(), 1); @@ -365,9 +357,9 @@ public void testReferencedTablesAndRoutines() public void testReferencedTablesWithViews() throws Exception { - runQueryAndWaitForEvents("SELECT test_column FROM mock.default.test_view", 2); + QueryEvents queryEvents = runQueryAndWaitForEvents("SELECT test_column FROM mock.default.test_view").getQueryEvents(); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); List tables = event.getMetadata().getTables(); assertThat(tables).hasSize(2); @@ -403,9 +395,9 @@ public void testReferencedTablesWithViews() public void testReferencedTablesWithMaterializedViews() throws Exception { - runQueryAndWaitForEvents("SELECT test_column FROM mock.default.test_materialized_view", 2); + QueryEvents queryEvents = runQueryAndWaitForEvents("SELECT test_column FROM mock.default.test_materialized_view").getQueryEvents(); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); List tables = event.getMetadata().getTables(); assertThat(tables).hasSize(2); @@ -440,9 +432,9 @@ public void testReferencedTablesWithMaterializedViews() public void testReferencedTablesInCreateView() throws Exception { - runQueryAndWaitForEvents("CREATE VIEW mock.default.create_another_test_view AS SELECT * FROM nation", 2); + QueryEvents queryEvents = runQueryAndWaitForEvents("CREATE VIEW mock.default.create_another_test_view AS SELECT * FROM nation").getQueryEvents(); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getCatalogName()).isEqualTo("mock"); assertThat(event.getIoMetadata().getOutput().get().getSchema()).isEqualTo("default"); @@ -471,9 +463,9 @@ public void testReferencedTablesInCreateView() public void testReferencedTablesInCreateMaterializedView() throws Exception { - runQueryAndWaitForEvents("CREATE MATERIALIZED VIEW mock.default.test_view AS SELECT * FROM nation", 2); + QueryEvents queryEvents = runQueryAndWaitForEvents("CREATE MATERIALIZED VIEW mock.default.test_view AS SELECT * FROM nation").getQueryEvents(); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getCatalogName()).isEqualTo("mock"); assertThat(event.getIoMetadata().getOutput().get().getSchema()).isEqualTo("default"); @@ -502,9 +494,9 @@ public void testReferencedTablesInCreateMaterializedView() public void testReferencedTablesWithRowFilter() throws Exception { - runQueryAndWaitForEvents("SELECT 1 FROM mock.default.test_table_with_row_filter", 2); + QueryEvents queryEvents = runQueryAndWaitForEvents("SELECT 1 FROM mock.default.test_table_with_row_filter").getQueryEvents(); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); List tables = event.getMetadata().getTables(); assertThat(tables).hasSize(2); @@ -540,9 +532,11 @@ public void testReferencedTablesWithRowFilter() public void testReferencedTablesWithColumnMask() throws Exception { - runQueryAndWaitForEvents("CREATE TABLE mock.default.create_table_with_referring_mask AS SELECT * FROM mock.default.test_table_with_column_mask", 2); + QueryEvents queryEvents = runQueryAndWaitForEvents( + "CREATE TABLE mock.default.create_table_with_referring_mask AS SELECT * FROM mock.default.test_table_with_column_mask" + ).getQueryEvents(); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getCatalogName()).isEqualTo("mock"); assertThat(event.getIoMetadata().getOutput().get().getSchema()).isEqualTo("default"); @@ -591,8 +585,8 @@ public void testReferencedColumns() throws Exception { // assert that ColumnInfos for referenced columns are present when the table was not aliased - runQueryAndWaitForEvents("SELECT name, nationkey FROM nation", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("SELECT name, nationkey FROM nation").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); TableInfo table = getOnlyElement(event.getMetadata().getTables()); assertEquals( @@ -602,8 +596,8 @@ public void testReferencedColumns() ImmutableSet.of("name", "nationkey")); // assert that ColumnInfos for referenced columns are present when the table was aliased - runQueryAndWaitForEvents("SELECT name, nationkey FROM nation n", 2); - event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + queryEvents = runQueryAndWaitForEvents("SELECT name, nationkey FROM nation n").getQueryEvents(); + event = queryEvents.getQueryCompletedEvent(); table = getOnlyElement(event.getMetadata().getTables()); assertEquals( @@ -613,8 +607,8 @@ public void testReferencedColumns() ImmutableSet.of("name", "nationkey")); // assert that ColumnInfos for referenced columns are present when the table was aliased and its columns were aliased - runQueryAndWaitForEvents("SELECT a, b FROM nation n(a, b, c, d)", 2); - event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + queryEvents = runQueryAndWaitForEvents("SELECT a, b FROM nation n(a, b, c, d)").getQueryEvents(); + event = queryEvents.getQueryCompletedEvent(); table = getOnlyElement(event.getMetadata().getTables()); assertEquals( @@ -632,9 +626,9 @@ public void testPrepareAndExecute() String prepareQuery = "PREPARE stmt FROM " + selectQuery; // QueryCreated: 1, QueryCompleted: 1, Splits: 0 - runQueryAndWaitForEvents(prepareQuery, 2); + QueryEvents queryEvents = runQueryAndWaitForEvents(prepareQuery).getQueryEvents(); - QueryCreatedEvent queryCreatedEvent = getOnlyElement(generatedEvents.getQueryCreatedEvents()); + QueryCreatedEvent queryCreatedEvent = queryEvents.getQueryCreatedEvent(); assertEquals(queryCreatedEvent.getContext().getServerVersion(), "testversion"); assertEquals(queryCreatedEvent.getContext().getServerAddress(), "127.0.0.1"); assertEquals(queryCreatedEvent.getContext().getEnvironment(), "testing"); @@ -642,7 +636,7 @@ public void testPrepareAndExecute() assertEquals(queryCreatedEvent.getMetadata().getQuery(), prepareQuery); assertFalse(queryCreatedEvent.getMetadata().getPreparedQuery().isPresent()); - QueryCompletedEvent queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent queryCompletedEvent = queryEvents.getQueryCompletedEvent(); assertTrue(queryCompletedEvent.getContext().getResourceGroupId().isPresent()); assertEquals(queryCompletedEvent.getContext().getResourceGroupId().get(), createResourceGroupId("global", "user-user")); assertEquals(queryCompletedEvent.getIoMetadata().getOutput(), Optional.empty()); @@ -655,9 +649,9 @@ public void testPrepareAndExecute() // Add prepared statement to a new session to eliminate any impact on other tests in this suite. Session sessionWithPrepare = Session.builder(getSession()).addPreparedStatement("stmt", selectQuery).build(); - queries.runQueryAndWaitForEvents("EXECUTE stmt USING 'SHIP'", 2, sessionWithPrepare); + queryEvents = queries.runQueryAndWaitForEvents("EXECUTE stmt USING 'SHIP'", sessionWithPrepare).getQueryEvents(); - queryCreatedEvent = getOnlyElement(generatedEvents.getQueryCreatedEvents()); + queryCreatedEvent = queryEvents.getQueryCreatedEvent(); assertEquals(queryCreatedEvent.getContext().getServerVersion(), "testversion"); assertEquals(queryCreatedEvent.getContext().getServerAddress(), "127.0.0.1"); assertEquals(queryCreatedEvent.getContext().getEnvironment(), "testing"); @@ -666,7 +660,7 @@ public void testPrepareAndExecute() assertTrue(queryCreatedEvent.getMetadata().getPreparedQuery().isPresent()); assertEquals(queryCreatedEvent.getMetadata().getPreparedQuery().get(), selectQuery); - queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + queryCompletedEvent = queryEvents.getQueryCompletedEvent(); assertTrue(queryCompletedEvent.getContext().getResourceGroupId().isPresent()); assertEquals(queryCompletedEvent.getContext().getResourceGroupId().get(), createResourceGroupId("global", "user-user")); assertEquals(queryCompletedEvent.getIoMetadata().getOutput(), Optional.empty()); @@ -682,19 +676,19 @@ public void testPrepareAndExecute() public void testOutputStats() throws Exception { - MaterializedResult result = runQueryAndWaitForEvents("SELECT 1 FROM lineitem", 2); - QueryCreatedEvent queryCreatedEvent = getOnlyElement(generatedEvents.getQueryCreatedEvents()); - QueryCompletedEvent queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + MaterializedResultWithEvents result = runQueryAndWaitForEvents("SELECT 1 FROM lineitem"); + QueryCreatedEvent queryCreatedEvent = result.getQueryEvents().getQueryCreatedEvent(); + QueryCompletedEvent queryCompletedEvent = result.getQueryEvents().getQueryCompletedEvent(); QueryStats queryStats = getDistributedQueryRunner().getCoordinator().getQueryManager().getFullQueryInfo(new QueryId(queryCreatedEvent.getMetadata().getQueryId())).getQueryStats(); assertTrue(queryStats.getOutputDataSize().toBytes() > 0L); assertTrue(queryCompletedEvent.getStatistics().getOutputBytes() > 0L); - assertEquals(result.getRowCount(), queryStats.getOutputPositions()); - assertEquals(result.getRowCount(), queryCompletedEvent.getStatistics().getOutputRows()); + assertEquals(result.getMaterializedResult().getRowCount(), queryStats.getOutputPositions()); + assertEquals(result.getMaterializedResult().getRowCount(), queryCompletedEvent.getStatistics().getOutputRows()); - runQueryAndWaitForEvents("SELECT COUNT(1) FROM lineitem", 2); - queryCreatedEvent = getOnlyElement(generatedEvents.getQueryCreatedEvents()); - queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + result = runQueryAndWaitForEvents("SELECT COUNT(1) FROM lineitem"); + queryCreatedEvent = result.getQueryEvents().getQueryCreatedEvent(); + queryCompletedEvent = result.getQueryEvents().getQueryCompletedEvent(); queryStats = getDistributedQueryRunner().getCoordinator().getQueryManager().getFullQueryInfo(new QueryId(queryCreatedEvent.getMetadata().getQueryId())).getQueryStats(); assertTrue(queryStats.getOutputDataSize().toBytes() > 0L); @@ -756,8 +750,8 @@ public void testOutputColumnsForSelectWithConstantExpression() public void testOutputColumnsForCreateTableAsSelectAll() throws Exception { - runQueryAndWaitForEvents("CREATE TABLE mock.default.create_new_table AS SELECT * FROM nation", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("CREATE TABLE mock.default.create_new_table AS SELECT * FROM nation").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getColumns().get()) .containsExactly( new OutputColumnMetadata("nationkey", BIGINT_TYPE, ImmutableSet.of(new ColumnDetail("tpch", "tiny", "nation", "nationkey"))), @@ -770,8 +764,8 @@ public void testOutputColumnsForCreateTableAsSelectAll() public void testOutputColumnsForCreateTableAsSelectAllFromView() throws Exception { - runQueryAndWaitForEvents("CREATE TABLE mock.default.create_new_table AS SELECT * FROM mock.default.test_view", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("CREATE TABLE mock.default.create_new_table AS SELECT * FROM mock.default.test_view").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getColumns().get()) .containsExactly( new OutputColumnMetadata("test_column", BIGINT_TYPE, ImmutableSet.of(new ColumnDetail("mock", "default", "test_view", "test_column")))); @@ -781,8 +775,8 @@ public void testOutputColumnsForCreateTableAsSelectAllFromView() public void testOutputColumnsForCreateTableAsSelectAllFromMaterializedView() throws Exception { - runQueryAndWaitForEvents("CREATE TABLE mock.default.create_new_table AS SELECT * FROM mock.default.test_materialized_view", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("CREATE TABLE mock.default.create_new_table AS SELECT * FROM mock.default.test_materialized_view").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getColumns().get()) .containsExactly( new OutputColumnMetadata("test_column", BIGINT_TYPE, ImmutableSet.of(new ColumnDetail("mock", "default", "test_materialized_view", "test_column")))); @@ -792,8 +786,8 @@ public void testOutputColumnsForCreateTableAsSelectAllFromMaterializedView() public void testOutputColumnsForCreateTableAsSelectWithAliasedColumn() throws Exception { - runQueryAndWaitForEvents("CREATE TABLE mock.default.create_new_table(aliased_bigint, aliased_varchar) AS SELECT nationkey AS keynation, concat(name, comment) FROM nation", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("CREATE TABLE mock.default.create_new_table(aliased_bigint, aliased_varchar) AS SELECT nationkey AS keynation, concat(name, comment) FROM nation").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getColumns().get()) .containsExactly( new OutputColumnMetadata("aliased_bigint", BIGINT_TYPE, ImmutableSet.of(new ColumnDetail("tpch", "tiny", "nation", "nationkey"))), @@ -1031,8 +1025,8 @@ public void testOutputColumnsWithCorrelatedQueries() public void testOutputColumnsForInsertingSingleColumn() throws Exception { - runQueryAndWaitForEvents("INSERT INTO mock.default.table_for_output(test_bigint) SELECT nationkey + 1 AS test_bigint FROM nation", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("INSERT INTO mock.default.table_for_output(test_bigint) SELECT nationkey + 1 AS test_bigint FROM nation").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getColumns().get()) .containsExactly( new OutputColumnMetadata("test_bigint", BIGINT_TYPE, ImmutableSet.of(new ColumnDetail("tpch", "tiny", "nation", "nationkey")))); @@ -1042,8 +1036,8 @@ public void testOutputColumnsForInsertingSingleColumn() public void testOutputColumnsForInsertingAliasedColumn() throws Exception { - runQueryAndWaitForEvents("INSERT INTO mock.default.table_for_output(test_varchar, test_bigint) SELECT name AS aliased_name, nationkey AS aliased_varchar FROM nation", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("INSERT INTO mock.default.table_for_output(test_varchar, test_bigint) SELECT name AS aliased_name, nationkey AS aliased_varchar FROM nation").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getColumns().get()) .containsExactly( new OutputColumnMetadata("test_varchar", VARCHAR_TYPE, ImmutableSet.of(new ColumnDetail("tpch", "tiny", "nation", "name"))), @@ -1054,8 +1048,8 @@ public void testOutputColumnsForInsertingAliasedColumn() public void testOutputColumnsForUpdatingAllColumns() throws Exception { - runQueryAndWaitForEvents("UPDATE mock.default.table_for_output SET test_varchar = 'reset', test_bigint = 1", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("UPDATE mock.default.table_for_output SET test_varchar = 'reset', test_bigint = 1").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getColumns().get()) .containsExactly( new OutputColumnMetadata("test_varchar", VARCHAR_TYPE, ImmutableSet.of()), @@ -1066,8 +1060,8 @@ public void testOutputColumnsForUpdatingAllColumns() public void testOutputColumnsForUpdatingSingleColumn() throws Exception { - runQueryAndWaitForEvents("UPDATE mock.default.table_for_output SET test_varchar = 're-reset' WHERE test_bigint = 1", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("UPDATE mock.default.table_for_output SET test_varchar = 're-reset' WHERE test_bigint = 1").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getColumns().get()) .containsExactly(new OutputColumnMetadata("test_varchar", VARCHAR_TYPE, ImmutableSet.of())); } @@ -1076,8 +1070,8 @@ public void testOutputColumnsForUpdatingSingleColumn() public void testCreateTable() throws Exception { - runQueryAndWaitForEvents("CREATE TABLE mock.default.create_simple_table (test_column BIGINT)", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("CREATE TABLE mock.default.create_simple_table (test_column BIGINT)").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getCatalogName()).isEqualTo("mock"); assertThat(event.getIoMetadata().getOutput().get().getSchema()).isEqualTo("default"); assertThat(event.getIoMetadata().getOutput().get().getTable()).isEqualTo("create_simple_table"); @@ -1089,8 +1083,8 @@ public void testCreateTable() public void testCreateTableLike() throws Exception { - runQueryAndWaitForEvents("CREATE TABLE mock.default.create_simple_table (test_column BIGINT, LIKE mock.default.test_table)", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("CREATE TABLE mock.default.create_simple_table (test_column BIGINT, LIKE mock.default.test_table)").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getCatalogName()).isEqualTo("mock"); assertThat(event.getIoMetadata().getOutput().get().getSchema()).isEqualTo("default"); assertThat(event.getIoMetadata().getOutput().get().getTable()).isEqualTo("create_simple_table"); @@ -1105,8 +1099,8 @@ public void testCreateTableLike() public void testConnectorMetrics() throws Exception { - runQueryAndWaitForEvents("SELECT * FROM mock.tiny.nation", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("SELECT * FROM mock.tiny.nation").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); List connectorMetrics = event.getIoMetadata().getInputs().stream() .map(QueryInputMetadata::getConnectorMetrics) .collect(toImmutableList()); @@ -1149,25 +1143,18 @@ public Object[][] setOperator() private void assertLineage(String baseQuery, Set inputTables, OutputColumnMetadata... outputColumnMetadata) throws Exception { - runQueryAndWaitForEvents("CREATE TABLE mock.default.create_new_table AS " + baseQuery, 2); - assertLineage(inputTables, outputColumnMetadata); - - runQueryAndWaitForEvents("CREATE VIEW mock.default.create_new_view AS " + baseQuery, 2); - assertLineage(inputTables, outputColumnMetadata); - - runQueryAndWaitForEvents("CREATE VIEW mock.default.create_new_materialized_view AS " + baseQuery, 2); - assertLineage(inputTables, outputColumnMetadata); - - runQueryAndWaitForEvents("INSERT INTO mock.default.table_for_output(test_varchar, test_bigint) " + baseQuery, 2); - assertLineage(inputTables, outputColumnMetadata); - - runQueryAndWaitForEvents(format("DELETE FROM mock.default.table_for_output WHERE EXISTS (%s) ", baseQuery), 2); - assertLineage(inputTables); + assertLineageInternal("CREATE TABLE mock.default.create_new_table AS " + baseQuery, inputTables, outputColumnMetadata); + assertLineageInternal("CREATE VIEW mock.default.create_new_view AS " + baseQuery, inputTables, outputColumnMetadata); + assertLineageInternal("CREATE VIEW mock.default.create_new_materialized_view AS " + baseQuery, inputTables, outputColumnMetadata); + assertLineageInternal("INSERT INTO mock.default.table_for_output(test_varchar, test_bigint) " + baseQuery, inputTables, outputColumnMetadata); + assertLineageInternal(format("DELETE FROM mock.default.table_for_output WHERE EXISTS (%s) ", baseQuery), inputTables); } - private void assertLineage(Set inputTables, OutputColumnMetadata... outputColumnMetadata) + private void assertLineageInternal(String sql, Set inputTables, OutputColumnMetadata... outputColumnMetadata) + throws Exception { - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents(sql).getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getMetadata().getTables()) .map(TestEventListenerBasic::getQualifiedName) .containsExactlyInAnyOrderElementsOf(inputTables); diff --git a/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerWithSplits.java b/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerWithSplits.java index 294b4f2346c2..8ec8d05875e7 100644 --- a/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerWithSplits.java +++ b/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerWithSplits.java @@ -16,8 +16,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.airlift.units.Duration; import io.trino.Session; import io.trino.connector.MockConnectorFactory; +import io.trino.execution.EventsAwaitingQueries.MaterializedResultWithEvents; +import io.trino.execution.EventsCollector.QueryEvents; import io.trino.execution.TestEventListenerPlugin.TestingEventListenerPlugin; import io.trino.plugin.resourcegroups.ResourceGroupManagerPlugin; import io.trino.plugin.tpch.TpchPlugin; @@ -31,12 +34,10 @@ import io.trino.spi.resourcegroups.QueryType; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; import org.intellij.lang.annotations.Language; import org.testng.annotations.Test; -import java.time.Duration; import java.util.List; import java.util.Optional; import java.util.Set; @@ -45,6 +46,7 @@ import static io.trino.execution.TestQueues.createResourceGroupId; import static io.trino.plugin.tpch.TpchConnectorFactory.TPCH_SPLITS_PER_NODE; import static io.trino.testing.TestingSession.testSessionBuilder; +import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toSet; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -92,7 +94,7 @@ public Iterable getConnectorFactories() queryRunner.getCoordinator().getResourceGroupManager().get() .setConfigurationManager("file", ImmutableMap.of("resource-groups.config-file", getResourceFilePath("resource_groups_config_simple.json"))); - queries = new EventsAwaitingQueries(generatedEvents, queryRunner, Duration.ofSeconds(1)); + queries = new EventsAwaitingQueries(generatedEvents, queryRunner); return queryRunner; } @@ -106,12 +108,9 @@ private String getResourceFilePath(String fileName) public void testSplitsForNormalQuery() throws Exception { - // We expect the following events - // QueryCreated: 1, QueryCompleted: 1, Splits: SPLITS_PER_NODE (leaf splits) + LocalExchange[SINGLE] split + Aggregation/Output split - int expectedEvents = 1 + 1 + SPLITS_PER_NODE + 1 + 1; - runQueryAndWaitForEvents("SELECT sum(linenumber) FROM lineitem", expectedEvents); + QueryEvents queryEvents = runQueryAndWaitForEvents("SELECT sum(linenumber) FROM lineitem").getQueryEvents(); - QueryCreatedEvent queryCreatedEvent = getOnlyElement(generatedEvents.getQueryCreatedEvents()); + QueryCreatedEvent queryCreatedEvent = queryEvents.getQueryCreatedEvent(); assertEquals(queryCreatedEvent.getContext().getServerVersion(), "testversion"); assertEquals(queryCreatedEvent.getContext().getServerAddress(), "127.0.0.1"); assertEquals(queryCreatedEvent.getContext().getEnvironment(), "testing"); @@ -119,7 +118,7 @@ public void testSplitsForNormalQuery() assertEquals(queryCreatedEvent.getMetadata().getQuery(), "SELECT sum(linenumber) FROM lineitem"); assertFalse(queryCreatedEvent.getMetadata().getPreparedQuery().isPresent()); - QueryCompletedEvent queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent queryCompletedEvent = queryEvents.getQueryCompletedEvent(); assertTrue(queryCompletedEvent.getContext().getResourceGroupId().isPresent()); assertEquals(queryCompletedEvent.getContext().getResourceGroupId().get(), createResourceGroupId("global", "user-user")); assertEquals(queryCompletedEvent.getIoMetadata().getOutput(), Optional.empty()); @@ -130,7 +129,7 @@ public void testSplitsForNormalQuery() assertFalse(queryCompletedEvent.getMetadata().getPreparedQuery().isPresent()); assertEquals(queryCompletedEvent.getStatistics().getCompletedSplits(), SPLITS_PER_NODE + 2); - List splitCompletedEvents = generatedEvents.getSplitCompletedEvents(); + List splitCompletedEvents = queryEvents.waitForSplitCompletedEvents(SPLITS_PER_NODE + 2, new Duration(30, SECONDS)); assertEquals(splitCompletedEvents.size(), SPLITS_PER_NODE + 2); // leaf splits + aggregation split // All splits must have the same query ID @@ -145,8 +144,8 @@ public void testSplitsForNormalQuery() .mapToLong(e -> e.getStatistics().getCompletedPositions()) .sum(); - MaterializedResult result = runQueryAndWaitForEvents("SELECT count(*) FROM lineitem", expectedEvents); - long expectedCompletedPositions = (long) result.getMaterializedRows().get(0).getField(0); + MaterializedResultWithEvents result = runQueryAndWaitForEvents("SELECT count(*) FROM lineitem"); + long expectedCompletedPositions = (long) result.getMaterializedResult().getMaterializedRows().get(0).getField(0); assertEquals(actualCompletedPositions, expectedCompletedPositions); QueryStatistics statistics = queryCompletedEvent.getStatistics(); @@ -191,9 +190,9 @@ public void testSplitsForConstantQuery() throws Exception { // QueryCreated: 1, QueryCompleted: 1, Splits: 1 - runQueryAndWaitForEvents("SELECT 1", 3); + QueryEvents queryEvents = runQueryAndWaitForEvents("SELECT 1").getQueryEvents(); - QueryCreatedEvent queryCreatedEvent = getOnlyElement(generatedEvents.getQueryCreatedEvents()); + QueryCreatedEvent queryCreatedEvent = queryEvents.getQueryCreatedEvent(); assertEquals(queryCreatedEvent.getContext().getServerVersion(), "testversion"); assertEquals(queryCreatedEvent.getContext().getServerAddress(), "127.0.0.1"); assertEquals(queryCreatedEvent.getContext().getEnvironment(), "testing"); @@ -202,7 +201,7 @@ public void testSplitsForConstantQuery() assertEquals(queryCreatedEvent.getMetadata().getQuery(), "SELECT 1"); assertFalse(queryCreatedEvent.getMetadata().getPreparedQuery().isPresent()); - QueryCompletedEvent queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent queryCompletedEvent = queryEvents.getQueryCompletedEvent(); assertTrue(queryCompletedEvent.getContext().getResourceGroupId().isPresent()); assertEquals(queryCompletedEvent.getContext().getResourceGroupId().get(), createResourceGroupId("global", "user-user")); assertEquals(queryCompletedEvent.getStatistics().getTotalRows(), 0L); @@ -211,14 +210,14 @@ public void testSplitsForConstantQuery() assertFalse(queryCompletedEvent.getMetadata().getPreparedQuery().isPresent()); assertEquals(queryCompletedEvent.getContext().getQueryType().get(), QueryType.SELECT); - List splitCompletedEvents = generatedEvents.getSplitCompletedEvents(); + List splitCompletedEvents = queryEvents.waitForSplitCompletedEvents(1, new Duration(30, SECONDS)); assertEquals(splitCompletedEvents.get(0).getQueryId(), queryCompletedEvent.getMetadata().getQueryId()); assertEquals(splitCompletedEvents.get(0).getStatistics().getCompletedPositions(), 1); } - private MaterializedResult runQueryAndWaitForEvents(@Language("SQL") String sql, int numEventsExpected) + private MaterializedResultWithEvents runQueryAndWaitForEvents(@Language("SQL") String sql) throws Exception { - return queries.runQueryAndWaitForEvents(sql, numEventsExpected, getSession()); + return queries.runQueryAndWaitForEvents(sql, getSession()); } }