diff --git a/core/trino-main/src/main/java/io/trino/testing/MaterializedResultWithQueryId.java b/core/trino-main/src/main/java/io/trino/testing/MaterializedResultWithQueryId.java new file mode 100644 index 000000000000..f070ad10c613 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/testing/MaterializedResultWithQueryId.java @@ -0,0 +1,40 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.testing; + +import io.trino.spi.QueryId; + +import static java.util.Objects.requireNonNull; + +public class MaterializedResultWithQueryId +{ + private final QueryId queryId; + private final MaterializedResult result; + + public MaterializedResultWithQueryId(QueryId queryId, MaterializedResult result) + { + this.queryId = requireNonNull(queryId, "queryId is null"); + this.result = requireNonNull(result, "result is null"); + } + + public QueryId getQueryId() + { + return queryId; + } + + public MaterializedResult getResult() + { + return result; + } +} diff --git a/core/trino-main/src/main/java/io/trino/testing/QueryFailedException.java b/core/trino-main/src/main/java/io/trino/testing/QueryFailedException.java new file mode 100644 index 000000000000..6ba9b4396736 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/testing/QueryFailedException.java @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.testing; + +import io.trino.spi.QueryId; + +public class QueryFailedException + extends RuntimeException +{ + private final QueryId queryId; + + public QueryFailedException(QueryId queryId, String message) + { + super(message); + this.queryId = queryId; + } + + public QueryFailedException(QueryId queryId, String message, Throwable cause) + { + super(message, cause); + this.queryId = queryId; + } + + public QueryId getQueryId() + { + return queryId; + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index 7384785872cb..f98b218ba982 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -30,8 +30,8 @@ import io.trino.testing.BaseConnectorSmokeTest; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.testing.TestingConnectorBehavior; import io.trino.tpch.TpchTable; import org.intellij.lang.annotations.Language; @@ -327,9 +327,9 @@ public void testInputDataSize() hiveTableName, getLocationForTable(bucketName, "foo"))); - ResultWithQueryId deltaResult = queryRunner.executeWithQueryId(broadcastJoinDistribution(true), "SELECT * FROM foo"); + MaterializedResultWithQueryId deltaResult = queryRunner.executeWithQueryId(broadcastJoinDistribution(true), "SELECT * FROM foo"); assertEquals(deltaResult.getResult().getRowCount(), 2); - ResultWithQueryId hiveResult = queryRunner.executeWithQueryId(broadcastJoinDistribution(true), format("SELECT * FROM %s.%s.%s", "hive", SCHEMA, hiveTableName)); + MaterializedResultWithQueryId hiveResult = queryRunner.executeWithQueryId(broadcastJoinDistribution(true), format("SELECT * FROM %s.%s.%s", "hive", SCHEMA, hiveTableName)); assertEquals(hiveResult.getResult().getRowCount(), 2); QueryManager queryManager = queryRunner.getCoordinator().getQueryManager(); @@ -1544,7 +1544,7 @@ private void invalidateMetadataCache(String tableName) private void testCountQuery(@Language("SQL") String sql, long expectedRowCount, long expectedSplitCount) { - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId(getSession(), sql); + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId(getSession(), sql); assertEquals(result.getResult().getOnlyColumnAsSet(), ImmutableSet.of(expectedRowCount)); verifySplitCount(result.getQueryId(), expectedSplitCount); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java index 2becfdf47756..2f3e8eff6e80 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java @@ -23,9 +23,9 @@ import io.trino.testing.BaseConnectorTest; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.MaterializedRow; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.TestTable; import io.trino.tpch.TpchTable; @@ -336,7 +336,7 @@ public void testTimestampPredicatePushdown(String value) assertUpdate("INSERT INTO " + tableName + " VALUES (TIMESTAMP '" + value + "')", 1); DistributedQueryRunner queryRunner = (DistributedQueryRunner) getQueryRunner(); - ResultWithQueryId queryResult = queryRunner.executeWithQueryId( + MaterializedResultWithQueryId queryResult = queryRunner.executeWithQueryId( getSession(), "SELECT * FROM " + tableName + " WHERE t < TIMESTAMP '" + value + "'"); assertEquals(getQueryInfo(queryRunner, queryResult).getQueryStats().getProcessedInputDataSize().toBytes(), 0); @@ -394,7 +394,7 @@ public void testAddColumnToPartitionedTable() } } - private QueryInfo getQueryInfo(DistributedQueryRunner queryRunner, ResultWithQueryId queryResult) + private QueryInfo getQueryInfo(DistributedQueryRunner queryRunner, MaterializedResultWithQueryId queryResult) { return queryRunner.getCoordinator().getQueryManager().getFullQueryInfo(queryResult.getQueryId()); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java index 0b919e51e839..6e564662659e 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java @@ -19,9 +19,8 @@ import io.trino.plugin.hive.containers.HiveMinioDataLake; import io.trino.spi.QueryId; import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.testing.sql.TestTable; import org.testng.annotations.Test; @@ -498,7 +497,7 @@ public void testDropStatsAccessControl() private void runAnalyzeVerifySplitCount(String tableName, long expectedSplitCount) { - ResultWithQueryId analyzeResult = getDistributedQueryRunner().executeWithQueryId(getSession(), "ANALYZE " + tableName); + MaterializedResultWithQueryId analyzeResult = getDistributedQueryRunner().executeWithQueryId(getSession(), "ANALYZE " + tableName); verifySplitCount(analyzeResult.getQueryId(), expectedSplitCount); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDynamicFiltering.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDynamicFiltering.java index 1bfebe45ddf0..91b279e5d424 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDynamicFiltering.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeDynamicFiltering.java @@ -32,9 +32,8 @@ import io.trino.split.SplitSource; import io.trino.sql.planner.OptimizerConfig.JoinDistributionType; import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.transaction.TransactionId; import io.trino.transaction.TransactionManager; import org.testng.annotations.DataProvider; @@ -107,8 +106,8 @@ public Object[][] joinDistributionTypes() public void testDynamicFiltering(JoinDistributionType joinDistributionType) { String query = "SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.totalprice > 59995 AND orders.totalprice < 60000"; - ResultWithQueryId filteredResult = getDistributedQueryRunner().executeWithQueryId(sessionWithDynamicFiltering(true, joinDistributionType), query); - ResultWithQueryId unfilteredResult = getDistributedQueryRunner().executeWithQueryId(sessionWithDynamicFiltering(false, joinDistributionType), query); + MaterializedResultWithQueryId filteredResult = getDistributedQueryRunner().executeWithQueryId(sessionWithDynamicFiltering(true, joinDistributionType), query); + MaterializedResultWithQueryId unfilteredResult = getDistributedQueryRunner().executeWithQueryId(sessionWithDynamicFiltering(false, joinDistributionType), query); assertEqualsIgnoreOrder(filteredResult.getResult().getMaterializedRows(), unfilteredResult.getResult().getMaterializedRows()); QueryInputStats filteredStats = getQueryInputStats(filteredResult.getQueryId()); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestPredicatePushdown.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestPredicatePushdown.java index 544885d5c26f..ccb7cf5b8397 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestPredicatePushdown.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestPredicatePushdown.java @@ -19,9 +19,9 @@ import io.trino.spi.QueryId; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.MaterializedRow; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import org.testng.annotations.Test; import org.testng.asserts.SoftAssert; @@ -143,7 +143,7 @@ public void testUpdatePushdown() */ private void assertPushdown(String actual, String expected, long countProcessed) { - ResultWithQueryId result = executeWithQueryId(actual); + MaterializedResultWithQueryId result = executeWithQueryId(actual); Set actualRows = Set.copyOf(result.getResult().getMaterializedRows()); Set expectedRows = Set.copyOf( computeExpected(expected, result.getResult().getTypes()).getMaterializedRows()); @@ -176,7 +176,7 @@ private void assertPushdown(String actual, String expected, long countProcessed) */ private void assertPushdownUpdate(String sql, long count, long countProcessed) { - ResultWithQueryId result = executeWithQueryId(sql); + MaterializedResultWithQueryId result = executeWithQueryId(sql); OptionalLong actualCount = result.getResult().getUpdateCount(); SoftAssert softly = new SoftAssert(); @@ -189,7 +189,7 @@ private void assertPushdownUpdate(String sql, long count, long countProcessed) softly.assertAll(); } - private ResultWithQueryId executeWithQueryId(String sql) + private MaterializedResultWithQueryId executeWithQueryId(String sql) { return getDistributedQueryRunner().executeWithQueryId(getSession(), sql); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestSplitPruning.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestSplitPruning.java index 5c156e52f469..effb9059fd0c 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestSplitPruning.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestSplitPruning.java @@ -20,7 +20,6 @@ import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import org.intellij.lang.annotations.Language; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -135,10 +134,10 @@ public void testStatsPruningNaN(String type) Set.of(), 0); - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResult result = getDistributedQueryRunner().execute( getSession(), format("SELECT name FROM %s WHERE val IS NOT NULL", tableName)); - assertEquals(result.getResult().getOnlyColumnAsSet(), Set.of("a5", "b5", "a6", "b6")); + assertEquals(result.getOnlyColumnAsSet(), Set.of("a5", "b5", "a6", "b6")); } @Test diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index efa1dd1fc3d4..b16bf81e9063 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -53,9 +53,9 @@ import io.trino.testing.BaseConnectorTest; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.MaterializedRow; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.TestTable; import io.trino.testing.sql.TrinoSqlExecutor; @@ -4725,7 +4725,7 @@ private void doTestParquetTimestampPredicatePushdown(Session baseSession, HiveTi assertQuery(session, "SELECT * FROM " + tableName, format("VALUES (%s)", formatTimestamp(value))); DistributedQueryRunner queryRunner = (DistributedQueryRunner) getQueryRunner(); - ResultWithQueryId queryResult = queryRunner.executeWithQueryId( + MaterializedResultWithQueryId queryResult = queryRunner.executeWithQueryId( session, format("SELECT * FROM %s WHERE t < %s", tableName, formatTimestamp(value))); assertEquals(getQueryInfo(queryRunner, queryResult).getQueryStats().getProcessedInputDataSize().toBytes(), 0); @@ -4756,7 +4756,7 @@ public void testOrcTimestampPredicatePushdown(HiveTimestampPrecision timestampPr // to account for the fact that ORC stats are stored at millisecond precision and Trino rounds timestamps, // we filter by timestamps that differ from the actual value by at least 1ms, to observe pruning DistributedQueryRunner queryRunner = getDistributedQueryRunner(); - ResultWithQueryId queryResult = queryRunner.executeWithQueryId( + MaterializedResultWithQueryId queryResult = queryRunner.executeWithQueryId( session, format("SELECT * FROM test_orc_timestamp_predicate_pushdown WHERE t < %s", formatTimestamp(value.minusNanos(MILLISECONDS.toNanos(1))))); assertEquals(getQueryInfo(queryRunner, queryResult).getQueryStats().getProcessedInputDataSize().toBytes(), 0); @@ -4845,7 +4845,7 @@ private void assertNoDataRead(@Language("SQL") String sql) results -> assertThat(results.getRowCount()).isEqualTo(0)); } - private QueryInfo getQueryInfo(DistributedQueryRunner queryRunner, ResultWithQueryId queryResult) + private QueryInfo getQueryInfo(DistributedQueryRunner queryRunner, MaterializedResultWithQueryId queryResult) { return queryRunner.getCoordinator().getQueryManager().getFullQueryInfo(queryResult.getQueryId()); } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDistributedJoinQueries.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDistributedJoinQueries.java index d716490c909a..3ef04090548e 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDistributedJoinQueries.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDistributedJoinQueries.java @@ -18,9 +18,8 @@ import io.trino.metadata.QualifiedObjectName; import io.trino.operator.OperatorStats; import io.trino.testing.AbstractTestJoinQueries; -import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import org.testng.annotations.Test; import static com.google.common.base.Verify.verify; @@ -59,7 +58,7 @@ public void testJoinWithEmptyBuildSide() Session session = Session.builder(getSession()) .setSystemProperty(JOIN_DISTRIBUTION_TYPE, BROADCAST.name()) .build(); - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( session, "SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.totalprice = 123.4567"); assertEquals(result.getResult().getRowCount(), 0); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 02eb7529af43..7f7b4153812a 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -36,9 +36,9 @@ import io.trino.testing.BaseConnectorTest; import io.trino.testing.DataProviders; import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.MaterializedRow; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.TestTable; import io.trino.tpch.TpchTable; @@ -3277,15 +3277,15 @@ public void testSplitPruningFromDataFileStatistics(DataMappingTestSetup testSetu private void verifySplitCount(String query, int expectedSplitCount) { - ResultWithQueryId selectAllPartitionsResult = getDistributedQueryRunner().executeWithQueryId(getSession(), query); + MaterializedResultWithQueryId selectAllPartitionsResult = getDistributedQueryRunner().executeWithQueryId(getSession(), query); assertEqualsIgnoreOrder(selectAllPartitionsResult.getResult().getMaterializedRows(), computeActual(withoutPredicatePushdown(getSession()), query).getMaterializedRows()); verifySplitCount(selectAllPartitionsResult.getQueryId(), expectedSplitCount); } private void verifyPredicatePushdownDataRead(@Language("SQL") String query, boolean supportsPushdown) { - ResultWithQueryId resultWithPredicatePushdown = getDistributedQueryRunner().executeWithQueryId(getSession(), query); - ResultWithQueryId resultWithoutPredicatePushdown = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId resultWithPredicatePushdown = getDistributedQueryRunner().executeWithQueryId(getSession(), query); + MaterializedResultWithQueryId resultWithoutPredicatePushdown = getDistributedQueryRunner().executeWithQueryId( withoutPredicatePushdown(getSession()), query); diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaIntegrationPushDown.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaIntegrationPushDown.java index faeeea04fbb8..fcc2b5fa607a 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaIntegrationPushDown.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaIntegrationPushDown.java @@ -19,9 +19,8 @@ import io.trino.spi.connector.SchemaTableName; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.testing.kafka.TestingKafka; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -93,7 +92,7 @@ public void testPartitionPushDown() String sql = format("SELECT count(*) FROM default.%s WHERE _partition_id=1", topicNamePartition); assertEventually(() -> { - ResultWithQueryId queryResult = getDistributedQueryRunner().executeWithQueryId(getSession(), sql); + MaterializedResultWithQueryId queryResult = getDistributedQueryRunner().executeWithQueryId(getSession(), sql); assertEquals(getQueryInfo(getDistributedQueryRunner(), queryResult).getQueryStats().getProcessedInputPositions(), MESSAGE_NUM / 2); }); } @@ -111,7 +110,7 @@ private void assertProcessedInputPossitions(String sql, long expectedProcessedIn { DistributedQueryRunner queryRunner = getDistributedQueryRunner(); assertEventually(() -> { - ResultWithQueryId queryResult = queryRunner.executeWithQueryId(getSession(), sql); + MaterializedResultWithQueryId queryResult = queryRunner.executeWithQueryId(getSession(), sql); assertEquals(getQueryInfo(queryRunner, queryResult).getQueryStats().getProcessedInputPositions(), expectedProcessedInputPositions); }); } @@ -131,7 +130,7 @@ public void testTimestampCreateTimeModePushDown() // timestamp_upper_bound_force_push_down_enabled default as false. assertEventually(() -> { - ResultWithQueryId queryResult = queryRunner.executeWithQueryId(getSession(), sql); + MaterializedResultWithQueryId queryResult = queryRunner.executeWithQueryId(getSession(), sql); assertThat(getQueryInfo(queryRunner, queryResult).getQueryStats().getProcessedInputPositions()) .isEqualTo(998); }); @@ -143,7 +142,7 @@ public void testTimestampCreateTimeModePushDown() .setSystemProperty("kafka.timestamp_upper_bound_force_push_down_enabled", "true") .build(); - ResultWithQueryId queryResult = queryRunner.executeWithQueryId(sessionWithUpperBoundPushDownEnabled, sql); + MaterializedResultWithQueryId queryResult = queryRunner.executeWithQueryId(sessionWithUpperBoundPushDownEnabled, sql); assertThat(getQueryInfo(queryRunner, queryResult).getQueryStats().getProcessedInputPositions()) .isEqualTo(2); }); @@ -163,13 +162,13 @@ public void testTimestampLogAppendModePushDown() recordMessage.getEndTime()); assertEventually(() -> { - ResultWithQueryId queryResult = queryRunner.executeWithQueryId(getSession(), sql); + MaterializedResultWithQueryId queryResult = queryRunner.executeWithQueryId(getSession(), sql); assertThat(getQueryInfo(queryRunner, queryResult).getQueryStats().getProcessedInputPositions()) .isEqualTo(2); }); } - private static QueryInfo getQueryInfo(DistributedQueryRunner queryRunner, ResultWithQueryId queryResult) + private static QueryInfo getQueryInfo(DistributedQueryRunner queryRunner, MaterializedResultWithQueryId queryResult) { return queryRunner.getCoordinator().getQueryManager().getFullQueryInfo(queryResult.getQueryId()); } diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java index 386c995bcde6..31ce6614458d 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java @@ -30,9 +30,8 @@ import io.trino.split.SplitSource; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.tpch.TpchTable; import io.trino.transaction.TransactionId; import io.trino.transaction.TransactionManager; @@ -180,7 +179,7 @@ public void testJoinDynamicFilteringBlockProbeSide() private void assertDynamicFiltering(@Language("SQL") String selectQuery, Session session, int expectedRowCount, int... expectedOperatorRowsRead) { DistributedQueryRunner runner = getDistributedQueryRunner(); - ResultWithQueryId result = runner.executeWithQueryId(session, selectQuery); + MaterializedResultWithQueryId result = runner.executeWithQueryId(session, selectQuery); assertEquals(result.getResult().getRowCount(), expectedRowCount); assertEquals(getOperatorRowsRead(runner, result.getQueryId()), Ints.asList(expectedOperatorRowsRead)); diff --git a/plugin/trino-memory/src/test/java/io/trino/plugin/memory/TestMemoryConnectorTest.java b/plugin/trino-memory/src/test/java/io/trino/plugin/memory/TestMemoryConnectorTest.java index d94676bc3288..fe8e47d63b91 100644 --- a/plugin/trino-memory/src/test/java/io/trino/plugin/memory/TestMemoryConnectorTest.java +++ b/plugin/trino-memory/src/test/java/io/trino/plugin/memory/TestMemoryConnectorTest.java @@ -25,9 +25,8 @@ import io.trino.spi.metrics.Metrics; import io.trino.testing.BaseConnectorTest; import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.TestTable; import io.trino.testng.services.Flaky; @@ -187,7 +186,7 @@ public void testExplainCustomMetricsScanFilter() private Metrics collectCustomMetrics(String sql) { DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); - ResultWithQueryId result = runner.executeWithQueryId(getSession(), sql); + MaterializedResultWithQueryId result = runner.executeWithQueryId(getSession(), sql); return runner .getCoordinator() .getQueryManager() @@ -202,7 +201,7 @@ private Metrics collectCustomMetrics(String sql) @Test(timeOut = 30_000) public void testPhysicalInputPositions() { - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), "SELECT * FROM lineitem JOIN tpch.tiny.supplier ON lineitem.suppkey = supplier.suppkey " + "AND supplier.name = 'Supplier#000000001'"); @@ -455,7 +454,7 @@ public void testJoinDynamicFilteringMultiJoin(JoinDistributionType joinDistribut private void assertDynamicFiltering(@Language("SQL") String selectQuery, Session session, int expectedRowCount, int... expectedOperatorRowsRead) { - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId(session, selectQuery); + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId(session, selectQuery); assertEquals(result.getResult().getRowCount(), expectedRowCount); assertEquals(getOperatorRowsRead(getDistributedQueryRunner(), result.getQueryId()), Ints.asList(expectedOperatorRowsRead)); diff --git a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java index 7f82cb2dfce0..4e979a93c89f 100644 --- a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java +++ b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java @@ -29,8 +29,8 @@ import io.trino.spi.QueryId; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.tpch.TpchTable; import org.assertj.core.api.AbstractThrowableAssert; import org.intellij.lang.annotations.Language; @@ -563,7 +563,7 @@ private ExecutionResult execute(Session session, String query, Optional String tableName = "table_" + randomTableSuffix(); setup.ifPresent(sql -> getQueryRunner().execute(noRetries(session), resolveTableName(sql, tableName))); - ResultWithQueryId resultWithQueryId = null; + MaterializedResultWithQueryId resultWithQueryId = null; RuntimeException failure = null; try { resultWithQueryId = getDistributedQueryRunner().executeWithQueryId(withTraceToken(session, traceToken), resolveTableName(query, tableName)); @@ -722,7 +722,7 @@ private static class ExecutionResult private final Optional updatedTableStatistics; private ExecutionResult( - ResultWithQueryId resultWithQueryId, + MaterializedResultWithQueryId resultWithQueryId, Optional updatedTableContent, Optional updatedTableStatistics) { diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestJoinQueries.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestJoinQueries.java index 9c9583bbed62..541a53f820aa 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestJoinQueries.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestJoinQueries.java @@ -2349,7 +2349,7 @@ public void testOutputDuplicatesInsensitiveJoin() private void assertJoinOutputPositions(@Language("SQL") String sql, int expectedJoinOutputPositions) { - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( Session.builder(getSession()) .setSystemProperty(JOIN_REORDERING_STRATEGY, "NONE") .build(), diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java index b4b660b9a2d2..85da0e30216e 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java @@ -465,7 +465,7 @@ protected void assertQueryStats( Consumer resultAssertion) { DistributedQueryRunner queryRunner = getDistributedQueryRunner(); - ResultWithQueryId resultWithQueryId = queryRunner.executeWithQueryId(session, query); + MaterializedResultWithQueryId resultWithQueryId = queryRunner.executeWithQueryId(session, query); QueryStats queryStats = queryRunner.getCoordinator() .getQueryManager() .getFullQueryInfo(resultWithQueryId.getQueryId()) diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java index ea845f4d17ee..29f744740f8a 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java @@ -19,7 +19,6 @@ import io.trino.client.ClientSelectedRole; import io.trino.client.ClientSession; import io.trino.client.Column; -import io.trino.client.QueryError; import io.trino.client.QueryStatusInfo; import io.trino.client.StatementClient; import io.trino.metadata.MetadataUtil; @@ -82,11 +81,13 @@ public void close() protected abstract ResultsSession getResultSession(Session session); public ResultWithQueryId execute(@Language("SQL") String sql) + throws QueryFailedException { return execute(defaultSession, sql); } public ResultWithQueryId execute(Session session, @Language("SQL") String sql) + throws QueryFailedException { ResultsSession resultsSession = getResultSession(session); @@ -99,10 +100,10 @@ public ResultWithQueryId execute(Session session, @Language("SQL") String sql } checkState(client.isFinished()); - QueryError error = client.finalStatusInfo().getError(); + QueryStatusInfo results = client.finalStatusInfo(); + QueryId queryId = new QueryId(results.getId()); - if (error == null) { - QueryStatusInfo results = client.finalStatusInfo(); + if (results.getError() == null) { if (results.getUpdateType() != null) { resultsSession.setUpdateType(results.getUpdateType()); } @@ -114,14 +115,14 @@ public ResultWithQueryId execute(Session session, @Language("SQL") String sql resultsSession.setStatementStats(results.getStats()); T result = resultsSession.build(client.getSetSessionProperties(), client.getResetSessionProperties()); - return new ResultWithQueryId<>(new QueryId(results.getId()), result); + return new ResultWithQueryId<>(queryId, result); } - if (error.getFailureInfo() != null) { - RuntimeException remoteException = error.getFailureInfo().toException(); - throw new RuntimeException(Optional.ofNullable(remoteException.getMessage()).orElseGet(remoteException::toString), remoteException); + if (results.getError().getFailureInfo() != null) { + RuntimeException remoteException = results.getError().getFailureInfo().toException(); + throw new QueryFailedException(queryId, Optional.ofNullable(remoteException.getMessage()).orElseGet(remoteException::toString), remoteException); } - throw new RuntimeException("Query failed: " + error.getMessage()); + throw new QueryFailedException(queryId, "Query failed: " + results.getError().getMessage()); // dump query info to console for debugging (NOTE: not pretty printed) // JsonCodec queryInfoJsonCodec = createCodecFactory().prettyPrint().jsonCodec(QueryInfo.class); diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java index 4b4065f91c1f..6619dce8650a 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java @@ -3408,7 +3408,7 @@ public void testWrittenStats() String tableName = "test_written_stats_" + randomTableSuffix(); try { String sql = "CREATE TABLE " + tableName + " AS SELECT * FROM nation"; - ResultWithQueryId resultResultWithQueryId = getDistributedQueryRunner().executeWithQueryId(getSession(), sql); + MaterializedResultWithQueryId resultResultWithQueryId = getDistributedQueryRunner().executeWithQueryId(getSession(), sql); QueryInfo queryInfo = getDistributedQueryRunner().getCoordinator().getQueryManager().getFullQueryInfo(resultResultWithQueryId.getQueryId()); assertEquals(queryInfo.getQueryStats().getOutputPositions(), 1L); diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseDynamicPartitionPruningTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseDynamicPartitionPruningTest.java index 64eb3137cb12..f650072fccf2 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseDynamicPartitionPruningTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseDynamicPartitionPruningTest.java @@ -99,7 +99,7 @@ protected Session getSession() public void testJoinWithEmptyBuildSide() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem JOIN supplier ON partitioned_lineitem.suppkey = supplier.suppkey AND supplier.name = 'abc'"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -124,7 +124,7 @@ public void testJoinWithSelectiveBuildSide() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem JOIN supplier ON partitioned_lineitem.suppkey = supplier.suppkey " + "AND supplier.name = 'Supplier#000000001'"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -148,7 +148,7 @@ public void testJoinWithSelectiveBuildSide() public void testJoinWithNonSelectiveBuildSide() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem JOIN supplier ON partitioned_lineitem.suppkey = supplier.suppkey"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -173,7 +173,7 @@ public void testJoinWithNonSelectiveBuildSide() public void testJoinLargeBuildSideRangeDynamicFiltering() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem JOIN orders ON partitioned_lineitem.orderkey = orders.orderkey"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -204,7 +204,7 @@ public void testJoinWithMultipleDynamicFiltersOnProbe() "SELECT supplier.suppkey FROM " + "partitioned_lineitem JOIN tpch.tiny.supplier ON partitioned_lineitem.suppkey = supplier.suppkey AND supplier.name IN ('Supplier#000000001', 'Supplier#000000002')" + ") t JOIN supplier ON t.suppkey = supplier.suppkey AND supplier.suppkey IN (2, 3)"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -237,7 +237,7 @@ public void testJoinWithImplicitCoercion() "VALUES " + LINEITEM_COUNT); @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem_int l JOIN supplier s ON l.suppkey_int = s.suppkey AND s.name = 'Supplier#000000001'"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -260,7 +260,7 @@ public void testJoinWithImplicitCoercion() public void testSemiJoinWithEmptyBuildSide() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem WHERE suppkey IN (SELECT suppkey FROM supplier WHERE name = 'abc')"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -283,7 +283,7 @@ public void testSemiJoinWithEmptyBuildSide() public void testSemiJoinWithSelectiveBuildSide() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem WHERE suppkey IN (SELECT suppkey FROM supplier WHERE name = 'Supplier#000000001')"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -307,7 +307,7 @@ public void testSemiJoinWithSelectiveBuildSide() public void testSemiJoinWithNonSelectiveBuildSide() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem WHERE suppkey IN (SELECT suppkey FROM supplier)"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -332,7 +332,7 @@ public void testSemiJoinWithNonSelectiveBuildSide() public void testSemiJoinLargeBuildSideRangeDynamicFiltering() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem WHERE orderkey IN (SELECT orderkey FROM orders)"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -359,7 +359,7 @@ public void testSemiJoinLargeBuildSideRangeDynamicFiltering() public void testRightJoinWithEmptyBuildSide() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem l RIGHT JOIN supplier s ON l.suppkey = s.suppkey WHERE name = 'abc'"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -382,7 +382,7 @@ public void testRightJoinWithEmptyBuildSide() public void testRightJoinWithSelectiveBuildSide() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem l RIGHT JOIN supplier s ON l.suppkey = s.suppkey WHERE name = 'Supplier#000000001'"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -406,7 +406,7 @@ public void testRightJoinWithSelectiveBuildSide() public void testRightJoinWithNonSelectiveBuildSide() { @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem l RIGHT JOIN supplier s ON l.suppkey = s.suppkey"; - ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), selectQuery); MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); @@ -494,7 +494,7 @@ private void assertDynamicFilters(Session session, @Language("SQL") String query private long getQueryInputPositions(Session session, @Language("SQL") String sql, int expectedRowCount) { DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); - ResultWithQueryId result = runner.executeWithQueryId(session, sql); + MaterializedResultWithQueryId result = runner.executeWithQueryId(session, sql); assertThat(result.getResult().getRowCount()).isEqualTo(expectedRowCount); QueryId queryId = result.getQueryId(); QueryStats stats = runner.getCoordinator().getQueryManager().getFullQueryInfo(queryId).getQueryStats(); diff --git a/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java b/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java index 8fb63a79cc42..3751180cc0de 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java @@ -536,11 +536,12 @@ public MaterializedResult execute(Session session, @Language("SQL") String sql) } } - public ResultWithQueryId executeWithQueryId(Session session, @Language("SQL") String sql) + public MaterializedResultWithQueryId executeWithQueryId(Session session, @Language("SQL") String sql) { lock.readLock().lock(); try { - return trinoClient.execute(session, sql); + ResultWithQueryId result = trinoClient.execute(session, sql); + return new MaterializedResultWithQueryId(result.getQueryId(), result.getResult()); } finally { lock.readLock().unlock(); @@ -550,7 +551,7 @@ public ResultWithQueryId executeWithQueryId(Session session, @Override public MaterializedResultWithPlan executeWithPlan(Session session, String sql, WarningCollector warningCollector) { - ResultWithQueryId resultWithQueryId = executeWithQueryId(session, sql); + MaterializedResultWithQueryId resultWithQueryId = executeWithQueryId(session, sql); return new MaterializedResultWithPlan(resultWithQueryId.getResult().toTestTypes(), getQueryPlan(resultWithQueryId.getQueryId())); } diff --git a/testing/trino-tests/src/test/java/io/trino/execution/EventsAwaitingQueries.java b/testing/trino-tests/src/test/java/io/trino/execution/EventsAwaitingQueries.java index 68a0404c628c..24ca576a6f93 100644 --- a/testing/trino-tests/src/test/java/io/trino/execution/EventsAwaitingQueries.java +++ b/testing/trino-tests/src/test/java/io/trino/execution/EventsAwaitingQueries.java @@ -13,46 +13,51 @@ */ package io.trino.execution; +import io.airlift.units.Duration; import io.trino.Session; +import io.trino.execution.EventsCollector.QueryEvents; +import io.trino.spi.QueryId; +import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResult; -import io.trino.testing.QueryRunner; +import io.trino.testing.MaterializedResultWithQueryId; +import io.trino.testing.QueryFailedException; import org.intellij.lang.annotations.Language; -import java.time.Duration; import java.util.Optional; import static com.google.common.base.Strings.nullToEmpty; import static java.lang.String.format; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.testng.Assert.fail; class EventsAwaitingQueries { private final EventsCollector eventsCollector; - private final QueryRunner queryRunner; - private final Duration extraWaitTime; + private final DistributedQueryRunner queryRunner; - EventsAwaitingQueries(EventsCollector eventsCollector, QueryRunner queryRunner, Duration extraWaitTime) + EventsAwaitingQueries(EventsCollector eventsCollector, DistributedQueryRunner queryRunner) { this.eventsCollector = requireNonNull(eventsCollector, "eventsCollector is null"); this.queryRunner = requireNonNull(queryRunner, "queryRunner is null"); - this.extraWaitTime = extraWaitTime; } - MaterializedResult runQueryAndWaitForEvents(@Language("SQL") String sql, int numEventsExpected, Session session) + MaterializedResultWithEvents runQueryAndWaitForEvents(@Language("SQL") String sql, Session session) throws Exception { - return runQueryAndWaitForEvents(sql, numEventsExpected, session, Optional.empty()); + return runQueryAndWaitForEvents(sql, session, Optional.empty()); } - MaterializedResult runQueryAndWaitForEvents(@Language("SQL") String sql, int numEventsExpected, Session session, Optional expectedExceptionRegEx) + MaterializedResultWithEvents runQueryAndWaitForEvents(@Language("SQL") String sql, Session session, Optional expectedExceptionRegEx) throws Exception { - eventsCollector.reset(numEventsExpected); + QueryId queryId = null; MaterializedResult result = null; try { - result = queryRunner.execute(session, sql); + MaterializedResultWithQueryId materializedResultWithQueryId = queryRunner.executeWithQueryId(session, sql); + queryId = materializedResultWithQueryId.getQueryId(); + result = materializedResultWithQueryId.getResult(); } catch (RuntimeException exception) { if (expectedExceptionRegEx.isPresent()) { @@ -64,12 +69,43 @@ MaterializedResult runQueryAndWaitForEvents(@Language("SQL") String sql, int num else { throw exception; } + if (exception instanceof QueryFailedException) { + queryId = ((QueryFailedException) exception).getQueryId(); + } + } + if (queryId == null) { + return null; } - eventsCollector.waitForEvents(10); + QueryEvents queryEvents = eventsCollector.getQueryEvents(queryId); + queryEvents.waitForQueryCompletion(new Duration(3, SECONDS)); + // Sleep some more so extraneous, unexpected events can be recorded too. // This is not rock solid but improves effectiveness on detecting duplicate events. - Thread.sleep(extraWaitTime.toMillis()); - return result; + SECONDS.sleep(1); + + return new MaterializedResultWithEvents(result, queryEvents); + } + + public static class MaterializedResultWithEvents + { + private final MaterializedResult materializedResult; + private final QueryEvents queryEvents; + + public MaterializedResultWithEvents(MaterializedResult materializedResult, QueryEvents queryEvents) + { + this.materializedResult = materializedResult; + this.queryEvents = requireNonNull(queryEvents, "queryEvents is null"); + } + + public MaterializedResult getMaterializedResult() + { + return materializedResult; + } + + public QueryEvents getQueryEvents() + { + return queryEvents; + } } } diff --git a/testing/trino-tests/src/test/java/io/trino/execution/EventsCollector.java b/testing/trino-tests/src/test/java/io/trino/execution/EventsCollector.java index d5fab70e9461..98c7ba9d36ec 100644 --- a/testing/trino-tests/src/test/java/io/trino/execution/EventsCollector.java +++ b/testing/trino-tests/src/test/java/io/trino/execution/EventsCollector.java @@ -14,6 +14,8 @@ package io.trino.execution; import com.google.common.collect.ImmutableList; +import io.airlift.units.Duration; +import io.trino.spi.QueryId; import io.trino.spi.eventlistener.QueryCompletedEvent; import io.trino.spi.eventlistener.QueryCreatedEvent; import io.trino.spi.eventlistener.SplitCompletedEvent; @@ -21,164 +23,163 @@ import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; +import java.util.concurrent.TimeoutException; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; @ThreadSafe final class EventsCollector { - private final EventFilters eventFilters; - @GuardedBy("this") - private ImmutableList.Builder queryCreatedEvents; - @GuardedBy("this") - private ImmutableList.Builder queryCompletedEvents; - @GuardedBy("this") - private ImmutableList.Builder splitCompletedEvents; - @GuardedBy("this") - private CountDownLatch eventsLatch; - - public EventsCollector() - { - this(EventFilters.builder().build()); - } - - public EventsCollector(EventFilters eventFilters) - { - this.eventFilters = requireNonNull(eventFilters, "eventFilters is null"); - reset(0); - } - - public synchronized void reset(int numEvents) - { - queryCreatedEvents = ImmutableList.builder(); - queryCompletedEvents = ImmutableList.builder(); - splitCompletedEvents = ImmutableList.builder(); - - eventsLatch = new CountDownLatch(numEvents); - } - - public void waitForEvents(int timeoutSeconds) - throws InterruptedException - { - CountDownLatch eventsLatch; - synchronized (this) { - // since the eventsLatch is replaced in the reset method, a lock is required for proper memory visibility - eventsLatch = this.eventsLatch; - } - eventsLatch.await(timeoutSeconds, TimeUnit.SECONDS); - } + private final ConcurrentHashMap queryEvents = new ConcurrentHashMap<>(); public synchronized void addQueryCreated(QueryCreatedEvent event) { - if (!eventFilters.getQueryCreatedFilter().test(event)) { - return; - } - queryCreatedEvents.add(event); - eventsLatch.countDown(); + getQueryEvents(new QueryId(event.getMetadata().getQueryId())).addQueryCreated(event); } public synchronized void addQueryCompleted(QueryCompletedEvent event) { - if (!eventFilters.getQueryCompletedFilter().test(event)) { - return; - } - queryCompletedEvents.add(event); - eventsLatch.countDown(); + getQueryEvents(new QueryId(event.getMetadata().getQueryId())).addQueryCompleted(event); } public synchronized void addSplitCompleted(SplitCompletedEvent event) { - if (!eventFilters.getSplitCompletedFilter().test(event)) { - return; - } - splitCompletedEvents.add(event); - eventsLatch.countDown(); + getQueryEvents(new QueryId(event.getQueryId())).addSplitCompleted(event); } - public synchronized List getQueryCreatedEvents() + public QueryEvents getQueryEvents(QueryId queryId) { - return queryCreatedEvents.build(); - } - - public synchronized List getQueryCompletedEvents() - { - return queryCompletedEvents.build(); - } - - public synchronized List getSplitCompletedEvents() - { - return splitCompletedEvents.build(); + return queryEvents.computeIfAbsent(queryId, ignored -> new QueryEvents()); } @ThreadSafe - public static class EventFilters + public static class QueryEvents { - private final Predicate queryCreatedFilter; - private final Predicate queryCompletedFilter; - private final Predicate splitCompletedFilter; - - private EventFilters( - Predicate queryCreatedFilter, - Predicate queryCompletedFilter, - Predicate splitCompletedFilter) + @GuardedBy("this") + private QueryCreatedEvent queryCreatedEvent; + @GuardedBy("this") + private QueryCompletedEvent queryCompletedEvent; + @GuardedBy("this") + private final CountDownLatch queryCompleteLatch = new CountDownLatch(1); + + @GuardedBy("this") + private final List splitCompletedEvents = new ArrayList<>(); + @GuardedBy("this") + private CountDownLatch splitEventLatch; + + @GuardedBy("this") + private final List failures = new ArrayList<>(); + + public synchronized QueryCreatedEvent getQueryCreatedEvent() { - this.queryCreatedFilter = requireNonNull(queryCreatedFilter, "queryCreatedFilter is null"); - this.queryCompletedFilter = requireNonNull(queryCompletedFilter, "queryCompletedFilter is null"); - this.splitCompletedFilter = requireNonNull(splitCompletedFilter, "splitCompletedFilter is null"); + checkFailure(); + if (queryCreatedEvent == null) { + throw new IllegalStateException("QueryCreatedEvent has not been set"); + } + return queryCreatedEvent; } - Predicate getQueryCreatedFilter() + public synchronized QueryCompletedEvent getQueryCompletedEvent() { - return queryCreatedFilter; + checkFailure(); + if (queryCompletedEvent == null) { + throw new IllegalStateException("QueryCompletedEvent has not been set"); + } + return queryCompletedEvent; } - Predicate getQueryCompletedFilter() + private synchronized void addQueryCreated(QueryCreatedEvent event) { - return queryCompletedFilter; + requireNonNull(event, "event is null"); + if (queryCreatedEvent != null) { + failures.add(new RuntimeException("QueryCreateEvent already set")); + return; + } + queryCreatedEvent = event; + + if (queryCompletedEvent != null) { + queryCompleteLatch.countDown(); + } } - Predicate getSplitCompletedFilter() + private synchronized void addQueryCompleted(QueryCompletedEvent event) { - return splitCompletedFilter; + requireNonNull(event, "event is null"); + if (queryCompletedEvent != null) { + failures.add(new RuntimeException("QueryCompletedEvent already set")); + return; + } + queryCompletedEvent = event; + + if (queryCreatedEvent != null) { + queryCompleteLatch.countDown(); + } } - public static Builder builder() + private synchronized void addSplitCompleted(SplitCompletedEvent event) { - return new Builder(); + splitCompletedEvents.add(event); + if (splitEventLatch != null) { + splitEventLatch.countDown(); + } } - public static class Builder + public void waitForQueryCompletion(Duration timeout) + throws InterruptedException, TimeoutException { - private Predicate queryCreatedFilter = queryCreatedEvent -> true; - private Predicate queryCompletedFilter = queryCompletedEvent -> true; - private Predicate splitCompletedFilter = splitCompletedEvent -> true; - - public Builder setQueryCreatedFilter(Predicate queryCreatedFilter) - { - this.queryCreatedFilter = queryCreatedFilter; - return this; + CountDownLatch latch; + synchronized (this) { + latch = queryCompleteLatch; } - public Builder setQueryCompletedFilter(Predicate queryCompletedFilter) - { - this.queryCompletedFilter = queryCompletedFilter; - return this; + boolean finished = latch.await(timeout.toMillis(), MILLISECONDS); + if (!finished) { + throw new TimeoutException("Query did not complete in " + timeout); + } + } + + public synchronized List waitForSplitCompletedEvents(int numberOfSplitEvents, Duration timeout) + throws InterruptedException, TimeoutException + { + CountDownLatch latch; + synchronized (this) { + checkFailure(); + + if (splitCompletedEvents.size() >= numberOfSplitEvents) { + return ImmutableList.copyOf(splitCompletedEvents); + } + + if (splitEventLatch != null) { + // support for waiting multiple times is complex and currently not needed, so it has not been implemented + throw new IllegalStateException("Wait for split completion already triggered for this query"); + } + splitEventLatch = new CountDownLatch(numberOfSplitEvents - splitCompletedEvents.size()); + latch = splitEventLatch; } - public Builder setSplitCompletedFilter(Predicate splitCompletedFilter) - { - this.splitCompletedFilter = splitCompletedFilter; - return this; + boolean finished = latch.await(timeout.toMillis(), MILLISECONDS); + if (!finished) { + throw new TimeoutException("Split events did not complete in " + timeout); } - public EventFilters build() - { - return new EventFilters(queryCreatedFilter, queryCompletedFilter, splitCompletedFilter); + synchronized (this) { + checkFailure(); + return ImmutableList.copyOf(splitCompletedEvents); + } + } + + private synchronized void checkFailure() + { + if (failures.isEmpty()) { + return; } + RuntimeException exception = new RuntimeException("Event collection failed"); + failures.forEach(exception::addSuppressed); } } } diff --git a/testing/trino-tests/src/test/java/io/trino/execution/TestCompletedEventWarnings.java b/testing/trino-tests/src/test/java/io/trino/execution/TestCompletedEventWarnings.java index 1c95d338275b..3c6e4e2e5f6e 100644 --- a/testing/trino-tests/src/test/java/io/trino/execution/TestCompletedEventWarnings.java +++ b/testing/trino-tests/src/test/java/io/trino/execution/TestCompletedEventWarnings.java @@ -15,7 +15,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.Closer; -import io.trino.execution.EventsCollector.EventFilters; +import io.trino.execution.EventsCollector.QueryEvents; import io.trino.execution.TestEventListenerPlugin.TestingEventListenerPlugin; import io.trino.execution.warnings.WarningCollectorConfig; import io.trino.spi.TrinoWarning; @@ -29,13 +29,11 @@ import org.testng.annotations.Test; import java.io.IOException; -import java.time.Duration; import java.util.List; import java.util.Set; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; -import static com.google.common.collect.Iterables.getOnlyElement; import static io.trino.SessionTestUtils.TEST_SESSION; import static org.testng.Assert.fail; @@ -44,10 +42,7 @@ public class TestCompletedEventWarnings { private static final int TEST_WARNINGS = 5; - private final EventsCollector generatedEvents = new EventsCollector(EventFilters.builder() - .setQueryCreatedFilter(event -> false) - .setSplitCompletedFilter(event -> false) - .build()); + private final EventsCollector generatedEvents = new EventsCollector(); private Closer closer; private EventsAwaitingQueries queries; @@ -63,7 +58,7 @@ public void setUp() .build(); closer.register(queryRunner); queryRunner.installPlugin(new TestingEventListenerPlugin(generatedEvents)); - queries = new EventsAwaitingQueries(generatedEvents, queryRunner, Duration.ofSeconds(1)); + queries = new EventsAwaitingQueries(generatedEvents, queryRunner); } @AfterClass(alwaysRun = true) @@ -92,9 +87,9 @@ public void testCompletedEventWarnings() private void assertWarnings(@Language("SQL") String sql, List expectedWarnings) throws Exception { - queries.runQueryAndWaitForEvents(sql, 1, TEST_SESSION); + QueryEvents queryEvents = queries.runQueryAndWaitForEvents(sql, TEST_SESSION).getQueryEvents(); - Set warnings = getOnlyElement(generatedEvents.getQueryCompletedEvents()) + Set warnings = queryEvents.getQueryCompletedEvent() .getWarnings() .stream() .map(TrinoWarning::getWarningCode) diff --git a/testing/trino-tests/src/test/java/io/trino/execution/TestConnectorEventListener.java b/testing/trino-tests/src/test/java/io/trino/execution/TestConnectorEventListener.java index 76cae6e6d531..3dfcc0de154d 100644 --- a/testing/trino-tests/src/test/java/io/trino/execution/TestConnectorEventListener.java +++ b/testing/trino-tests/src/test/java/io/trino/execution/TestConnectorEventListener.java @@ -16,30 +16,21 @@ import com.google.common.collect.ImmutableList; import com.google.common.io.Closer; import io.trino.connector.MockConnectorFactory; -import io.trino.execution.EventsCollector.EventFilters; import io.trino.spi.Plugin; import io.trino.spi.connector.ConnectorFactory; -import io.trino.spi.eventlistener.QueryCompletedEvent; -import io.trino.spi.eventlistener.QueryCreatedEvent; import io.trino.testing.DistributedQueryRunner; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.io.IOException; -import java.time.Duration; -import java.util.List; -import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.SessionTestUtils.TEST_SESSION; -import static org.assertj.core.api.Assertions.assertThat; @Test(singleThreaded = true) public class TestConnectorEventListener { - private final EventsCollector generatedEvents = new EventsCollector(EventFilters.builder() - .setSplitCompletedFilter(event -> false) - .build()); + private final EventsCollector generatedEvents = new EventsCollector(); private Closer closer; private EventsAwaitingQueries queries; @@ -62,7 +53,7 @@ public Iterable getConnectorFactories() }); closer.register(queryRunner); queryRunner.createCatalog("mock-catalog", "mock"); - queries = new EventsAwaitingQueries(generatedEvents, queryRunner, Duration.ofSeconds(1)); + queries = new EventsAwaitingQueries(generatedEvents, queryRunner); } @AfterClass(alwaysRun = true) @@ -79,20 +70,6 @@ public void tearDown() public void testConnectorEventHandlerReceivingEvents() throws Exception { - queries.runQueryAndWaitForEvents("SELECT 1", 2, TEST_SESSION); - - List queryCreatedEvents = generatedEvents.getQueryCreatedEvents(); - List queryCompletedEvents = generatedEvents.getQueryCompletedEvents(); - List allEvents = ImmutableList.builder() - .addAll(queryCreatedEvents) - .addAll(queryCompletedEvents) - .build(); - List eventTypes = allEvents.stream().map(event -> event.getClass().getSimpleName()).sorted().collect(toImmutableList()); - assertThat(allEvents) - .size().withFailMessage(() -> "got events: " + eventTypes).isEqualTo(2); - assertThat(queryCreatedEvents) - .size().withFailMessage(() -> "got events: " + eventTypes).isEqualTo(1); - assertThat(queryCompletedEvents) - .size().withFailMessage(() -> "got events: " + eventTypes).isEqualTo(1); + queries.runQueryAndWaitForEvents("SELECT 1", TEST_SESSION).getQueryEvents(); } } diff --git a/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java b/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java index d0262c935fe2..b6186425def6 100644 --- a/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java +++ b/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java @@ -19,7 +19,8 @@ import io.trino.Session; import io.trino.connector.MockConnectorFactory; import io.trino.connector.MockConnectorTableHandle; -import io.trino.execution.EventsCollector.EventFilters; +import io.trino.execution.EventsAwaitingQueries.MaterializedResultWithEvents; +import io.trino.execution.EventsCollector.QueryEvents; import io.trino.execution.TestEventListenerPlugin.TestingEventListenerPlugin; import io.trino.plugin.base.metrics.LongCount; import io.trino.plugin.resourcegroups.ResourceGroupManagerPlugin; @@ -46,7 +47,6 @@ import io.trino.spi.security.ViewExpression; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; import org.intellij.lang.annotations.Language; import org.testng.annotations.DataProvider; @@ -90,7 +90,6 @@ public class TestEventListenerBasic private static final String BIGINT_TYPE = BIGINT.getDisplayName(); private static final Metrics TEST_METRICS = new Metrics(ImmutableMap.of("test_metrics", new LongCount(1))); - private final EventsCollector generatedEvents = new EventsCollector(buildEventFilters()); private EventsAwaitingQueries queries; @Override @@ -104,6 +103,8 @@ protected QueryRunner createQueryRunner() .setClientInfo("{\"clientVersion\":\"testVersion\"}") .build(); + EventsCollector generatedEvents = new EventsCollector(); + DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session).setNodeCount(1).build(); queryRunner.installPlugin(new TpchPlugin()); queryRunner.installPlugin(new TestingEventListenerPlugin(generatedEvents)); @@ -195,20 +196,11 @@ public Iterable getConnectorFactories() queryRunner.getCoordinator().getResourceGroupManager().get() .setConfigurationManager("file", ImmutableMap.of("resource-groups.config-file", getResourceFilePath("resource_groups_config_simple.json"))); - queries = new EventsAwaitingQueries(generatedEvents, queryRunner, Duration.ofSeconds(1)); + queries = new EventsAwaitingQueries(generatedEvents, queryRunner); return queryRunner; } - private static EventFilters buildEventFilters() - { - return EventFilters.builder() - .setQueryCreatedFilter(event -> !event.getMetadata().getQuery().contains(IGNORE_EVENT_MARKER)) - .setQueryCompletedFilter(event -> !event.getMetadata().getQuery().contains(IGNORE_EVENT_MARKER)) - .setSplitCompletedFilter(event -> false) - .build(); - } - private String getResourceFilePath(String fileName) { try { @@ -219,10 +211,10 @@ private String getResourceFilePath(String fileName) } } - private MaterializedResult runQueryAndWaitForEvents(@Language("SQL") String sql, int numEventsExpected) + private MaterializedResultWithEvents runQueryAndWaitForEvents(@Language("SQL") String sql) throws Exception { - return queries.runQueryAndWaitForEvents(sql, numEventsExpected, getSession()); + return queries.runQueryAndWaitForEvents(sql, getSession()); } @Test @@ -320,9 +312,9 @@ private void assertFailedQuery(@Language("SQL") String sql, String expectedFailu private void assertFailedQuery(Session session, @Language("SQL") String sql, String expectedFailure) throws Exception { - queries.runQueryAndWaitForEvents(sql, 2, session, Optional.of(expectedFailure)); + QueryEvents queryEvents = queries.runQueryAndWaitForEvents(sql, session, Optional.of(expectedFailure)).getQueryEvents(); - QueryCompletedEvent queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent queryCompletedEvent = queryEvents.getQueryCompletedEvent(); assertEquals(queryCompletedEvent.getMetadata().getQuery(), sql); QueryFailureInfo failureInfo = queryCompletedEvent.getFailureInfo() @@ -334,9 +326,9 @@ private void assertFailedQuery(Session session, @Language("SQL") String sql, Str public void testReferencedTablesAndRoutines() throws Exception { - runQueryAndWaitForEvents("SELECT sum(linenumber) FROM lineitem", 2); + QueryEvents queryEvents = runQueryAndWaitForEvents("SELECT sum(linenumber) FROM lineitem").getQueryEvents(); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); List tables = event.getMetadata().getTables(); assertEquals(tables.size(), 1); @@ -365,9 +357,9 @@ public void testReferencedTablesAndRoutines() public void testReferencedTablesWithViews() throws Exception { - runQueryAndWaitForEvents("SELECT test_column FROM mock.default.test_view", 2); + QueryEvents queryEvents = runQueryAndWaitForEvents("SELECT test_column FROM mock.default.test_view").getQueryEvents(); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); List tables = event.getMetadata().getTables(); assertThat(tables).hasSize(2); @@ -403,9 +395,9 @@ public void testReferencedTablesWithViews() public void testReferencedTablesWithMaterializedViews() throws Exception { - runQueryAndWaitForEvents("SELECT test_column FROM mock.default.test_materialized_view", 2); + QueryEvents queryEvents = runQueryAndWaitForEvents("SELECT test_column FROM mock.default.test_materialized_view").getQueryEvents(); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); List tables = event.getMetadata().getTables(); assertThat(tables).hasSize(2); @@ -440,9 +432,9 @@ public void testReferencedTablesWithMaterializedViews() public void testReferencedTablesInCreateView() throws Exception { - runQueryAndWaitForEvents("CREATE VIEW mock.default.create_another_test_view AS SELECT * FROM nation", 2); + QueryEvents queryEvents = runQueryAndWaitForEvents("CREATE VIEW mock.default.create_another_test_view AS SELECT * FROM nation").getQueryEvents(); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getCatalogName()).isEqualTo("mock"); assertThat(event.getIoMetadata().getOutput().get().getSchema()).isEqualTo("default"); @@ -471,9 +463,9 @@ public void testReferencedTablesInCreateView() public void testReferencedTablesInCreateMaterializedView() throws Exception { - runQueryAndWaitForEvents("CREATE MATERIALIZED VIEW mock.default.test_view AS SELECT * FROM nation", 2); + QueryEvents queryEvents = runQueryAndWaitForEvents("CREATE MATERIALIZED VIEW mock.default.test_view AS SELECT * FROM nation").getQueryEvents(); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getCatalogName()).isEqualTo("mock"); assertThat(event.getIoMetadata().getOutput().get().getSchema()).isEqualTo("default"); @@ -502,9 +494,9 @@ public void testReferencedTablesInCreateMaterializedView() public void testReferencedTablesWithRowFilter() throws Exception { - runQueryAndWaitForEvents("SELECT 1 FROM mock.default.test_table_with_row_filter", 2); + QueryEvents queryEvents = runQueryAndWaitForEvents("SELECT 1 FROM mock.default.test_table_with_row_filter").getQueryEvents(); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); List tables = event.getMetadata().getTables(); assertThat(tables).hasSize(2); @@ -540,9 +532,11 @@ public void testReferencedTablesWithRowFilter() public void testReferencedTablesWithColumnMask() throws Exception { - runQueryAndWaitForEvents("CREATE TABLE mock.default.create_table_with_referring_mask AS SELECT * FROM mock.default.test_table_with_column_mask", 2); + QueryEvents queryEvents = runQueryAndWaitForEvents( + "CREATE TABLE mock.default.create_table_with_referring_mask AS SELECT * FROM mock.default.test_table_with_column_mask" + ).getQueryEvents(); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getCatalogName()).isEqualTo("mock"); assertThat(event.getIoMetadata().getOutput().get().getSchema()).isEqualTo("default"); @@ -591,8 +585,8 @@ public void testReferencedColumns() throws Exception { // assert that ColumnInfos for referenced columns are present when the table was not aliased - runQueryAndWaitForEvents("SELECT name, nationkey FROM nation", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("SELECT name, nationkey FROM nation").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); TableInfo table = getOnlyElement(event.getMetadata().getTables()); assertEquals( @@ -602,8 +596,8 @@ public void testReferencedColumns() ImmutableSet.of("name", "nationkey")); // assert that ColumnInfos for referenced columns are present when the table was aliased - runQueryAndWaitForEvents("SELECT name, nationkey FROM nation n", 2); - event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + queryEvents = runQueryAndWaitForEvents("SELECT name, nationkey FROM nation n").getQueryEvents(); + event = queryEvents.getQueryCompletedEvent(); table = getOnlyElement(event.getMetadata().getTables()); assertEquals( @@ -613,8 +607,8 @@ public void testReferencedColumns() ImmutableSet.of("name", "nationkey")); // assert that ColumnInfos for referenced columns are present when the table was aliased and its columns were aliased - runQueryAndWaitForEvents("SELECT a, b FROM nation n(a, b, c, d)", 2); - event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + queryEvents = runQueryAndWaitForEvents("SELECT a, b FROM nation n(a, b, c, d)").getQueryEvents(); + event = queryEvents.getQueryCompletedEvent(); table = getOnlyElement(event.getMetadata().getTables()); assertEquals( @@ -632,9 +626,9 @@ public void testPrepareAndExecute() String prepareQuery = "PREPARE stmt FROM " + selectQuery; // QueryCreated: 1, QueryCompleted: 1, Splits: 0 - runQueryAndWaitForEvents(prepareQuery, 2); + QueryEvents queryEvents = runQueryAndWaitForEvents(prepareQuery).getQueryEvents(); - QueryCreatedEvent queryCreatedEvent = getOnlyElement(generatedEvents.getQueryCreatedEvents()); + QueryCreatedEvent queryCreatedEvent = queryEvents.getQueryCreatedEvent(); assertEquals(queryCreatedEvent.getContext().getServerVersion(), "testversion"); assertEquals(queryCreatedEvent.getContext().getServerAddress(), "127.0.0.1"); assertEquals(queryCreatedEvent.getContext().getEnvironment(), "testing"); @@ -642,7 +636,7 @@ public void testPrepareAndExecute() assertEquals(queryCreatedEvent.getMetadata().getQuery(), prepareQuery); assertFalse(queryCreatedEvent.getMetadata().getPreparedQuery().isPresent()); - QueryCompletedEvent queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent queryCompletedEvent = queryEvents.getQueryCompletedEvent(); assertTrue(queryCompletedEvent.getContext().getResourceGroupId().isPresent()); assertEquals(queryCompletedEvent.getContext().getResourceGroupId().get(), createResourceGroupId("global", "user-user")); assertEquals(queryCompletedEvent.getIoMetadata().getOutput(), Optional.empty()); @@ -655,9 +649,9 @@ public void testPrepareAndExecute() // Add prepared statement to a new session to eliminate any impact on other tests in this suite. Session sessionWithPrepare = Session.builder(getSession()).addPreparedStatement("stmt", selectQuery).build(); - queries.runQueryAndWaitForEvents("EXECUTE stmt USING 'SHIP'", 2, sessionWithPrepare); + queryEvents = queries.runQueryAndWaitForEvents("EXECUTE stmt USING 'SHIP'", sessionWithPrepare).getQueryEvents(); - queryCreatedEvent = getOnlyElement(generatedEvents.getQueryCreatedEvents()); + queryCreatedEvent = queryEvents.getQueryCreatedEvent(); assertEquals(queryCreatedEvent.getContext().getServerVersion(), "testversion"); assertEquals(queryCreatedEvent.getContext().getServerAddress(), "127.0.0.1"); assertEquals(queryCreatedEvent.getContext().getEnvironment(), "testing"); @@ -666,7 +660,7 @@ public void testPrepareAndExecute() assertTrue(queryCreatedEvent.getMetadata().getPreparedQuery().isPresent()); assertEquals(queryCreatedEvent.getMetadata().getPreparedQuery().get(), selectQuery); - queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + queryCompletedEvent = queryEvents.getQueryCompletedEvent(); assertTrue(queryCompletedEvent.getContext().getResourceGroupId().isPresent()); assertEquals(queryCompletedEvent.getContext().getResourceGroupId().get(), createResourceGroupId("global", "user-user")); assertEquals(queryCompletedEvent.getIoMetadata().getOutput(), Optional.empty()); @@ -682,19 +676,19 @@ public void testPrepareAndExecute() public void testOutputStats() throws Exception { - MaterializedResult result = runQueryAndWaitForEvents("SELECT 1 FROM lineitem", 2); - QueryCreatedEvent queryCreatedEvent = getOnlyElement(generatedEvents.getQueryCreatedEvents()); - QueryCompletedEvent queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + MaterializedResultWithEvents result = runQueryAndWaitForEvents("SELECT 1 FROM lineitem"); + QueryCreatedEvent queryCreatedEvent = result.getQueryEvents().getQueryCreatedEvent(); + QueryCompletedEvent queryCompletedEvent = result.getQueryEvents().getQueryCompletedEvent(); QueryStats queryStats = getDistributedQueryRunner().getCoordinator().getQueryManager().getFullQueryInfo(new QueryId(queryCreatedEvent.getMetadata().getQueryId())).getQueryStats(); assertTrue(queryStats.getOutputDataSize().toBytes() > 0L); assertTrue(queryCompletedEvent.getStatistics().getOutputBytes() > 0L); - assertEquals(result.getRowCount(), queryStats.getOutputPositions()); - assertEquals(result.getRowCount(), queryCompletedEvent.getStatistics().getOutputRows()); + assertEquals(result.getMaterializedResult().getRowCount(), queryStats.getOutputPositions()); + assertEquals(result.getMaterializedResult().getRowCount(), queryCompletedEvent.getStatistics().getOutputRows()); - runQueryAndWaitForEvents("SELECT COUNT(1) FROM lineitem", 2); - queryCreatedEvent = getOnlyElement(generatedEvents.getQueryCreatedEvents()); - queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + result = runQueryAndWaitForEvents("SELECT COUNT(1) FROM lineitem"); + queryCreatedEvent = result.getQueryEvents().getQueryCreatedEvent(); + queryCompletedEvent = result.getQueryEvents().getQueryCompletedEvent(); queryStats = getDistributedQueryRunner().getCoordinator().getQueryManager().getFullQueryInfo(new QueryId(queryCreatedEvent.getMetadata().getQueryId())).getQueryStats(); assertTrue(queryStats.getOutputDataSize().toBytes() > 0L); @@ -756,8 +750,8 @@ public void testOutputColumnsForSelectWithConstantExpression() public void testOutputColumnsForCreateTableAsSelectAll() throws Exception { - runQueryAndWaitForEvents("CREATE TABLE mock.default.create_new_table AS SELECT * FROM nation", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("CREATE TABLE mock.default.create_new_table AS SELECT * FROM nation").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getColumns().get()) .containsExactly( new OutputColumnMetadata("nationkey", BIGINT_TYPE, ImmutableSet.of(new ColumnDetail("tpch", "tiny", "nation", "nationkey"))), @@ -770,8 +764,8 @@ public void testOutputColumnsForCreateTableAsSelectAll() public void testOutputColumnsForCreateTableAsSelectAllFromView() throws Exception { - runQueryAndWaitForEvents("CREATE TABLE mock.default.create_new_table AS SELECT * FROM mock.default.test_view", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("CREATE TABLE mock.default.create_new_table AS SELECT * FROM mock.default.test_view").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getColumns().get()) .containsExactly( new OutputColumnMetadata("test_column", BIGINT_TYPE, ImmutableSet.of(new ColumnDetail("mock", "default", "test_view", "test_column")))); @@ -781,8 +775,8 @@ public void testOutputColumnsForCreateTableAsSelectAllFromView() public void testOutputColumnsForCreateTableAsSelectAllFromMaterializedView() throws Exception { - runQueryAndWaitForEvents("CREATE TABLE mock.default.create_new_table AS SELECT * FROM mock.default.test_materialized_view", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("CREATE TABLE mock.default.create_new_table AS SELECT * FROM mock.default.test_materialized_view").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getColumns().get()) .containsExactly( new OutputColumnMetadata("test_column", BIGINT_TYPE, ImmutableSet.of(new ColumnDetail("mock", "default", "test_materialized_view", "test_column")))); @@ -792,8 +786,8 @@ public void testOutputColumnsForCreateTableAsSelectAllFromMaterializedView() public void testOutputColumnsForCreateTableAsSelectWithAliasedColumn() throws Exception { - runQueryAndWaitForEvents("CREATE TABLE mock.default.create_new_table(aliased_bigint, aliased_varchar) AS SELECT nationkey AS keynation, concat(name, comment) FROM nation", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("CREATE TABLE mock.default.create_new_table(aliased_bigint, aliased_varchar) AS SELECT nationkey AS keynation, concat(name, comment) FROM nation").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getColumns().get()) .containsExactly( new OutputColumnMetadata("aliased_bigint", BIGINT_TYPE, ImmutableSet.of(new ColumnDetail("tpch", "tiny", "nation", "nationkey"))), @@ -1031,8 +1025,8 @@ public void testOutputColumnsWithCorrelatedQueries() public void testOutputColumnsForInsertingSingleColumn() throws Exception { - runQueryAndWaitForEvents("INSERT INTO mock.default.table_for_output(test_bigint) SELECT nationkey + 1 AS test_bigint FROM nation", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("INSERT INTO mock.default.table_for_output(test_bigint) SELECT nationkey + 1 AS test_bigint FROM nation").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getColumns().get()) .containsExactly( new OutputColumnMetadata("test_bigint", BIGINT_TYPE, ImmutableSet.of(new ColumnDetail("tpch", "tiny", "nation", "nationkey")))); @@ -1042,8 +1036,8 @@ public void testOutputColumnsForInsertingSingleColumn() public void testOutputColumnsForInsertingAliasedColumn() throws Exception { - runQueryAndWaitForEvents("INSERT INTO mock.default.table_for_output(test_varchar, test_bigint) SELECT name AS aliased_name, nationkey AS aliased_varchar FROM nation", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("INSERT INTO mock.default.table_for_output(test_varchar, test_bigint) SELECT name AS aliased_name, nationkey AS aliased_varchar FROM nation").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getColumns().get()) .containsExactly( new OutputColumnMetadata("test_varchar", VARCHAR_TYPE, ImmutableSet.of(new ColumnDetail("tpch", "tiny", "nation", "name"))), @@ -1054,8 +1048,8 @@ public void testOutputColumnsForInsertingAliasedColumn() public void testOutputColumnsForUpdatingAllColumns() throws Exception { - runQueryAndWaitForEvents("UPDATE mock.default.table_for_output SET test_varchar = 'reset', test_bigint = 1", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("UPDATE mock.default.table_for_output SET test_varchar = 'reset', test_bigint = 1").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getColumns().get()) .containsExactly( new OutputColumnMetadata("test_varchar", VARCHAR_TYPE, ImmutableSet.of()), @@ -1066,8 +1060,8 @@ public void testOutputColumnsForUpdatingAllColumns() public void testOutputColumnsForUpdatingSingleColumn() throws Exception { - runQueryAndWaitForEvents("UPDATE mock.default.table_for_output SET test_varchar = 're-reset' WHERE test_bigint = 1", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("UPDATE mock.default.table_for_output SET test_varchar = 're-reset' WHERE test_bigint = 1").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getColumns().get()) .containsExactly(new OutputColumnMetadata("test_varchar", VARCHAR_TYPE, ImmutableSet.of())); } @@ -1076,8 +1070,8 @@ public void testOutputColumnsForUpdatingSingleColumn() public void testCreateTable() throws Exception { - runQueryAndWaitForEvents("CREATE TABLE mock.default.create_simple_table (test_column BIGINT)", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("CREATE TABLE mock.default.create_simple_table (test_column BIGINT)").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getCatalogName()).isEqualTo("mock"); assertThat(event.getIoMetadata().getOutput().get().getSchema()).isEqualTo("default"); assertThat(event.getIoMetadata().getOutput().get().getTable()).isEqualTo("create_simple_table"); @@ -1089,8 +1083,8 @@ public void testCreateTable() public void testCreateTableLike() throws Exception { - runQueryAndWaitForEvents("CREATE TABLE mock.default.create_simple_table (test_column BIGINT, LIKE mock.default.test_table)", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("CREATE TABLE mock.default.create_simple_table (test_column BIGINT, LIKE mock.default.test_table)").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getIoMetadata().getOutput().get().getCatalogName()).isEqualTo("mock"); assertThat(event.getIoMetadata().getOutput().get().getSchema()).isEqualTo("default"); assertThat(event.getIoMetadata().getOutput().get().getTable()).isEqualTo("create_simple_table"); @@ -1105,8 +1099,8 @@ public void testCreateTableLike() public void testConnectorMetrics() throws Exception { - runQueryAndWaitForEvents("SELECT * FROM mock.tiny.nation", 2); - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents("SELECT * FROM mock.tiny.nation").getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); List connectorMetrics = event.getIoMetadata().getInputs().stream() .map(QueryInputMetadata::getConnectorMetrics) .collect(toImmutableList()); @@ -1149,25 +1143,18 @@ public Object[][] setOperator() private void assertLineage(String baseQuery, Set inputTables, OutputColumnMetadata... outputColumnMetadata) throws Exception { - runQueryAndWaitForEvents("CREATE TABLE mock.default.create_new_table AS " + baseQuery, 2); - assertLineage(inputTables, outputColumnMetadata); - - runQueryAndWaitForEvents("CREATE VIEW mock.default.create_new_view AS " + baseQuery, 2); - assertLineage(inputTables, outputColumnMetadata); - - runQueryAndWaitForEvents("CREATE VIEW mock.default.create_new_materialized_view AS " + baseQuery, 2); - assertLineage(inputTables, outputColumnMetadata); - - runQueryAndWaitForEvents("INSERT INTO mock.default.table_for_output(test_varchar, test_bigint) " + baseQuery, 2); - assertLineage(inputTables, outputColumnMetadata); - - runQueryAndWaitForEvents(format("DELETE FROM mock.default.table_for_output WHERE EXISTS (%s) ", baseQuery), 2); - assertLineage(inputTables); + assertLineageInternal("CREATE TABLE mock.default.create_new_table AS " + baseQuery, inputTables, outputColumnMetadata); + assertLineageInternal("CREATE VIEW mock.default.create_new_view AS " + baseQuery, inputTables, outputColumnMetadata); + assertLineageInternal("CREATE VIEW mock.default.create_new_materialized_view AS " + baseQuery, inputTables, outputColumnMetadata); + assertLineageInternal("INSERT INTO mock.default.table_for_output(test_varchar, test_bigint) " + baseQuery, inputTables, outputColumnMetadata); + assertLineageInternal(format("DELETE FROM mock.default.table_for_output WHERE EXISTS (%s) ", baseQuery), inputTables); } - private void assertLineage(Set inputTables, OutputColumnMetadata... outputColumnMetadata) + private void assertLineageInternal(String sql, Set inputTables, OutputColumnMetadata... outputColumnMetadata) + throws Exception { - QueryCompletedEvent event = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryEvents queryEvents = runQueryAndWaitForEvents(sql).getQueryEvents(); + QueryCompletedEvent event = queryEvents.getQueryCompletedEvent(); assertThat(event.getMetadata().getTables()) .map(TestEventListenerBasic::getQualifiedName) .containsExactlyInAnyOrderElementsOf(inputTables); diff --git a/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerWithSplits.java b/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerWithSplits.java index 294b4f2346c2..8ec8d05875e7 100644 --- a/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerWithSplits.java +++ b/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerWithSplits.java @@ -16,8 +16,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.airlift.units.Duration; import io.trino.Session; import io.trino.connector.MockConnectorFactory; +import io.trino.execution.EventsAwaitingQueries.MaterializedResultWithEvents; +import io.trino.execution.EventsCollector.QueryEvents; import io.trino.execution.TestEventListenerPlugin.TestingEventListenerPlugin; import io.trino.plugin.resourcegroups.ResourceGroupManagerPlugin; import io.trino.plugin.tpch.TpchPlugin; @@ -31,12 +34,10 @@ import io.trino.spi.resourcegroups.QueryType; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; import org.intellij.lang.annotations.Language; import org.testng.annotations.Test; -import java.time.Duration; import java.util.List; import java.util.Optional; import java.util.Set; @@ -45,6 +46,7 @@ import static io.trino.execution.TestQueues.createResourceGroupId; import static io.trino.plugin.tpch.TpchConnectorFactory.TPCH_SPLITS_PER_NODE; import static io.trino.testing.TestingSession.testSessionBuilder; +import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toSet; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -92,7 +94,7 @@ public Iterable getConnectorFactories() queryRunner.getCoordinator().getResourceGroupManager().get() .setConfigurationManager("file", ImmutableMap.of("resource-groups.config-file", getResourceFilePath("resource_groups_config_simple.json"))); - queries = new EventsAwaitingQueries(generatedEvents, queryRunner, Duration.ofSeconds(1)); + queries = new EventsAwaitingQueries(generatedEvents, queryRunner); return queryRunner; } @@ -106,12 +108,9 @@ private String getResourceFilePath(String fileName) public void testSplitsForNormalQuery() throws Exception { - // We expect the following events - // QueryCreated: 1, QueryCompleted: 1, Splits: SPLITS_PER_NODE (leaf splits) + LocalExchange[SINGLE] split + Aggregation/Output split - int expectedEvents = 1 + 1 + SPLITS_PER_NODE + 1 + 1; - runQueryAndWaitForEvents("SELECT sum(linenumber) FROM lineitem", expectedEvents); + QueryEvents queryEvents = runQueryAndWaitForEvents("SELECT sum(linenumber) FROM lineitem").getQueryEvents(); - QueryCreatedEvent queryCreatedEvent = getOnlyElement(generatedEvents.getQueryCreatedEvents()); + QueryCreatedEvent queryCreatedEvent = queryEvents.getQueryCreatedEvent(); assertEquals(queryCreatedEvent.getContext().getServerVersion(), "testversion"); assertEquals(queryCreatedEvent.getContext().getServerAddress(), "127.0.0.1"); assertEquals(queryCreatedEvent.getContext().getEnvironment(), "testing"); @@ -119,7 +118,7 @@ public void testSplitsForNormalQuery() assertEquals(queryCreatedEvent.getMetadata().getQuery(), "SELECT sum(linenumber) FROM lineitem"); assertFalse(queryCreatedEvent.getMetadata().getPreparedQuery().isPresent()); - QueryCompletedEvent queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent queryCompletedEvent = queryEvents.getQueryCompletedEvent(); assertTrue(queryCompletedEvent.getContext().getResourceGroupId().isPresent()); assertEquals(queryCompletedEvent.getContext().getResourceGroupId().get(), createResourceGroupId("global", "user-user")); assertEquals(queryCompletedEvent.getIoMetadata().getOutput(), Optional.empty()); @@ -130,7 +129,7 @@ public void testSplitsForNormalQuery() assertFalse(queryCompletedEvent.getMetadata().getPreparedQuery().isPresent()); assertEquals(queryCompletedEvent.getStatistics().getCompletedSplits(), SPLITS_PER_NODE + 2); - List splitCompletedEvents = generatedEvents.getSplitCompletedEvents(); + List splitCompletedEvents = queryEvents.waitForSplitCompletedEvents(SPLITS_PER_NODE + 2, new Duration(30, SECONDS)); assertEquals(splitCompletedEvents.size(), SPLITS_PER_NODE + 2); // leaf splits + aggregation split // All splits must have the same query ID @@ -145,8 +144,8 @@ public void testSplitsForNormalQuery() .mapToLong(e -> e.getStatistics().getCompletedPositions()) .sum(); - MaterializedResult result = runQueryAndWaitForEvents("SELECT count(*) FROM lineitem", expectedEvents); - long expectedCompletedPositions = (long) result.getMaterializedRows().get(0).getField(0); + MaterializedResultWithEvents result = runQueryAndWaitForEvents("SELECT count(*) FROM lineitem"); + long expectedCompletedPositions = (long) result.getMaterializedResult().getMaterializedRows().get(0).getField(0); assertEquals(actualCompletedPositions, expectedCompletedPositions); QueryStatistics statistics = queryCompletedEvent.getStatistics(); @@ -191,9 +190,9 @@ public void testSplitsForConstantQuery() throws Exception { // QueryCreated: 1, QueryCompleted: 1, Splits: 1 - runQueryAndWaitForEvents("SELECT 1", 3); + QueryEvents queryEvents = runQueryAndWaitForEvents("SELECT 1").getQueryEvents(); - QueryCreatedEvent queryCreatedEvent = getOnlyElement(generatedEvents.getQueryCreatedEvents()); + QueryCreatedEvent queryCreatedEvent = queryEvents.getQueryCreatedEvent(); assertEquals(queryCreatedEvent.getContext().getServerVersion(), "testversion"); assertEquals(queryCreatedEvent.getContext().getServerAddress(), "127.0.0.1"); assertEquals(queryCreatedEvent.getContext().getEnvironment(), "testing"); @@ -202,7 +201,7 @@ public void testSplitsForConstantQuery() assertEquals(queryCreatedEvent.getMetadata().getQuery(), "SELECT 1"); assertFalse(queryCreatedEvent.getMetadata().getPreparedQuery().isPresent()); - QueryCompletedEvent queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); + QueryCompletedEvent queryCompletedEvent = queryEvents.getQueryCompletedEvent(); assertTrue(queryCompletedEvent.getContext().getResourceGroupId().isPresent()); assertEquals(queryCompletedEvent.getContext().getResourceGroupId().get(), createResourceGroupId("global", "user-user")); assertEquals(queryCompletedEvent.getStatistics().getTotalRows(), 0L); @@ -211,14 +210,14 @@ public void testSplitsForConstantQuery() assertFalse(queryCompletedEvent.getMetadata().getPreparedQuery().isPresent()); assertEquals(queryCompletedEvent.getContext().getQueryType().get(), QueryType.SELECT); - List splitCompletedEvents = generatedEvents.getSplitCompletedEvents(); + List splitCompletedEvents = queryEvents.waitForSplitCompletedEvents(1, new Duration(30, SECONDS)); assertEquals(splitCompletedEvents.get(0).getQueryId(), queryCompletedEvent.getMetadata().getQueryId()); assertEquals(splitCompletedEvents.get(0).getStatistics().getCompletedPositions(), 1); } - private MaterializedResult runQueryAndWaitForEvents(@Language("SQL") String sql, int numEventsExpected) + private MaterializedResultWithEvents runQueryAndWaitForEvents(@Language("SQL") String sql) throws Exception { - return queries.runQueryAndWaitForEvents(sql, numEventsExpected, getSession()); + return queries.runQueryAndWaitForEvents(sql, getSession()); } } diff --git a/testing/trino-tests/src/test/java/io/trino/tests/TestLateMaterializationQueries.java b/testing/trino-tests/src/test/java/io/trino/tests/TestLateMaterializationQueries.java index effabc67fa5a..e97f3025e861 100644 --- a/testing/trino-tests/src/test/java/io/trino/tests/TestLateMaterializationQueries.java +++ b/testing/trino-tests/src/test/java/io/trino/tests/TestLateMaterializationQueries.java @@ -18,8 +18,8 @@ import io.trino.execution.QueryManager; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; import io.trino.tests.tpch.TpchQueryRunnerBuilder; import org.intellij.lang.annotations.Language; import org.testng.annotations.Test; @@ -87,10 +87,10 @@ private void assertLazyQuery(@Language("SQL") String sql) { QueryManager queryManager = getDistributedQueryRunner().getCoordinator().getQueryManager(); - ResultWithQueryId workProcessorResultResultWithQueryId = getDistributedQueryRunner().executeWithQueryId(lateMaterialization(), sql); + MaterializedResultWithQueryId workProcessorResultResultWithQueryId = getDistributedQueryRunner().executeWithQueryId(lateMaterialization(), sql); QueryInfo workProcessorQueryInfo = queryManager.getFullQueryInfo(workProcessorResultResultWithQueryId.getQueryId()); - ResultWithQueryId noWorkProcessorResultResultWithQueryId = getDistributedQueryRunner().executeWithQueryId(noLateMaterialization(), sql); + MaterializedResultWithQueryId noWorkProcessorResultResultWithQueryId = getDistributedQueryRunner().executeWithQueryId(noLateMaterialization(), sql); QueryInfo noWorkProcessorQueryInfo = queryManager.getFullQueryInfo(noWorkProcessorResultResultWithQueryId.getQueryId()); // ensure results are correct diff --git a/testing/trino-tests/src/test/java/io/trino/tests/TestMinWorkerRequirement.java b/testing/trino-tests/src/test/java/io/trino/tests/TestMinWorkerRequirement.java index 956050942e7c..03ab4762263c 100644 --- a/testing/trino-tests/src/test/java/io/trino/tests/TestMinWorkerRequirement.java +++ b/testing/trino-tests/src/test/java/io/trino/tests/TestMinWorkerRequirement.java @@ -21,8 +21,7 @@ import io.trino.execution.QueryInfo; import io.trino.execution.QueryManager; import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.MaterializedResult; -import io.trino.testing.ResultWithQueryId; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.tests.tpch.TpchQueryRunnerBuilder; import org.testng.annotations.Test; @@ -186,17 +185,17 @@ public void testMultipleRequiredWorkerNodesSessionOverride() .setCatalog("tpch") .setSchema("tiny") .build(); - ListenableFuture> queryFuture1 = service.submit(() -> queryRunner.executeWithQueryId(session1, "SELECT COUNT(*) from lineitem")); + ListenableFuture queryFuture1 = service.submit(() -> queryRunner.executeWithQueryId(session1, "SELECT COUNT(*) from lineitem")); Session session2 = Session.builder(session1) .setSystemProperty(REQUIRED_WORKERS_COUNT, "3") .build(); - ListenableFuture> queryFuture2 = service.submit(() -> queryRunner.executeWithQueryId(session2, "SELECT COUNT(*) from lineitem")); + ListenableFuture queryFuture2 = service.submit(() -> queryRunner.executeWithQueryId(session2, "SELECT COUNT(*) from lineitem")); Session session3 = Session.builder(session1) .setSystemProperty(REQUIRED_WORKERS_COUNT, "4") .build(); - ListenableFuture> queryFuture3 = service.submit(() -> queryRunner.executeWithQueryId(session3, "SELECT COUNT(*) from lineitem")); + ListenableFuture queryFuture3 = service.submit(() -> queryRunner.executeWithQueryId(session3, "SELECT COUNT(*) from lineitem")); MILLISECONDS.sleep(1000); // None of the queries should run