diff --git a/core/trino-main/src/main/java/io/trino/sql/DynamicFilters.java b/core/trino-main/src/main/java/io/trino/sql/DynamicFilters.java index 15b2395b9c29..02e9879121f8 100644 --- a/core/trino-main/src/main/java/io/trino/sql/DynamicFilters.java +++ b/core/trino-main/src/main/java/io/trino/sql/DynamicFilters.java @@ -150,20 +150,16 @@ public static Expression replaceDynamicFilterId(Call dynamicFilterFunctionCall, public static boolean isDynamicFilter(Expression expression) { - return getDescriptor(expression).isPresent(); + return (expression instanceof Call call) && isDynamicFilterFunction(call); } public static Optional getDescriptor(Expression expression) { - if (!(expression instanceof Call call)) { + if (!isDynamicFilter(expression)) { return Optional.empty(); } - if (!isDynamicFilterFunction(call)) { - return Optional.empty(); - } - - List arguments = call.arguments(); + List arguments = ((Call) expression).arguments(); checkArgument(arguments.size() == 4, "invalid arguments count: %s", arguments.size()); Expression probeSymbol = arguments.get(0); diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForTaskScaleWriters.java b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForTaskScaleWriters.java index a277a2674ddb..0926ba9c65c3 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForTaskScaleWriters.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForTaskScaleWriters.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.trino.Session; +import io.trino.Session.SessionBuilder; import io.trino.connector.MockConnector; import io.trino.connector.MockConnectorColumnHandle; import io.trino.connector.MockConnectorFactory; @@ -63,7 +64,7 @@ import static io.trino.sql.planner.plan.ExchangeNode.Scope.REMOTE; import static io.trino.sql.planner.plan.ExchangeNode.Type.GATHER; import static io.trino.sql.planner.plan.ExchangeNode.Type.REPARTITION; -import static io.trino.testing.TestingSession.testSessionBuilder; +import static io.trino.testing.TestingSession.testSession; public class TestAddLocalExchangesForTaskScaleWriters extends BasePlanTest @@ -73,7 +74,7 @@ public class TestAddLocalExchangesForTaskScaleWriters @Override protected PlanTester createPlanTester() { - PlanTester planTester = PlanTester.create(testSessionBuilder().build()); + PlanTester planTester = PlanTester.create(testSession()); planTester.createCatalog( "mock_with_scaled_writers", createConnectorFactory("mock_with_scaled_writers", true, true), @@ -150,7 +151,7 @@ public void testLocalScaledUnpartitionedWriterDistribution() { assertDistributedPlan( "INSERT INTO unpartitioned_table SELECT * FROM source_table", - testSessionBuilder() + testingSessionBuilder() .setCatalog("mock_without_multiple_writer_per_partition") .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true") @@ -166,7 +167,7 @@ public void testLocalScaledUnpartitionedWriterDistribution() assertDistributedPlan( "INSERT INTO unpartitioned_table SELECT * FROM source_table", - testSessionBuilder() + testingSessionBuilder() .setCatalog("mock_without_multiple_writer_per_partition") .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false") @@ -186,7 +187,7 @@ public void testLocalScaledUnpartitionedWriterWithPerTaskScalingDisabled() { assertDistributedPlan( "INSERT INTO unpartitioned_table SELECT * FROM source_table", - testSessionBuilder() + testingSessionBuilder() .setCatalog("mock_without_scaled_writers") .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true") @@ -202,7 +203,7 @@ public void testLocalScaledUnpartitionedWriterWithPerTaskScalingDisabled() assertDistributedPlan( "INSERT INTO unpartitioned_table SELECT * FROM source_table", - testSessionBuilder() + testingSessionBuilder() .setCatalog("mock_without_scaled_writers") .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false") @@ -229,7 +230,7 @@ public void testLocalScaledPartitionedWriterWithoutSupportForMultipleWritersPerP assertDistributedPlan( "INSERT INTO connector_partitioned_table SELECT * FROM source_table", - testSessionBuilder() + testingSessionBuilder() .setCatalog(catalogName) .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, String.valueOf(taskScaleWritersEnabled)) @@ -257,7 +258,7 @@ public void testLocalScaledPartitionedWriterWithPerTaskScalingDisabled() assertDistributedPlan( "INSERT INTO connector_partitioned_table SELECT * FROM source_table", - testSessionBuilder() + testingSessionBuilder() .setCatalog(catalogName) .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, String.valueOf(taskScaleWritersEnabled)) @@ -278,7 +279,7 @@ public void testLocalScaledPartitionedWriterForSystemPartitioningWithEnforcedPre { assertDistributedPlan( "INSERT INTO system_partitioned_table SELECT * FROM source_table", - testSessionBuilder() + testingSessionBuilder() .setCatalog("mock_with_scaled_writers") .setSchema("mock") // Enforce preferred partitioning @@ -296,7 +297,7 @@ public void testLocalScaledPartitionedWriterForSystemPartitioningWithEnforcedPre assertDistributedPlan( "INSERT INTO system_partitioned_table SELECT * FROM source_table", - testSessionBuilder() + testingSessionBuilder() .setCatalog("mock_with_scaled_writers") .setSchema("mock") // Enforce preferred partitioning @@ -329,7 +330,7 @@ public void testLocalScaledPartitionedWriterForConnectorPartitioning() assertDistributedPlan( "INSERT INTO connector_partitioned_table SELECT * FROM source_table", - testSessionBuilder() + testingSessionBuilder() .setCatalog(catalogName) .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true") @@ -345,7 +346,7 @@ public void testLocalScaledPartitionedWriterForConnectorPartitioning() assertDistributedPlan( "INSERT INTO connector_partitioned_table SELECT * FROM source_table", - testSessionBuilder() + testingSessionBuilder() .setCatalog(catalogName) .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false") @@ -365,7 +366,7 @@ public void testLocalScaledPartitionedWriterWithEnforcedLocalPreferredPartitioni { assertDistributedPlan( "INSERT INTO system_partitioned_table SELECT * FROM source_table", - testSessionBuilder() + testingSessionBuilder() .setCatalog("mock_with_scaled_writers") .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true") @@ -381,7 +382,7 @@ public void testLocalScaledPartitionedWriterWithEnforcedLocalPreferredPartitioni assertDistributedPlan( "INSERT INTO system_partitioned_table SELECT * FROM source_table", - testSessionBuilder() + testingSessionBuilder() .setCatalog("mock_with_scaled_writers") .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false") @@ -401,7 +402,7 @@ public void testTableExecuteLocalScalingDisabledForPartitionedTable() { @Language("SQL") String query = "ALTER TABLE system_partitioned_table EXECUTE optimize(file_size_threshold => '10MB')"; - Session session = Session.builder(getPlanTester().getDefaultSession()) + Session session = testingSessionBuilder() .setCatalog("mock_with_scaled_writers") .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true") @@ -422,7 +423,7 @@ public void testTableExecuteLocalScalingDisabledForUnpartitionedTable() { @Language("SQL") String query = "ALTER TABLE unpartitioned_table EXECUTE optimize(file_size_threshold => '10MB')"; - Session session = Session.builder(getPlanTester().getDefaultSession()) + Session session = testingSessionBuilder() .setCatalog("mock_with_scaled_writers") .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true") @@ -437,4 +438,9 @@ public void testTableExecuteLocalScalingDisabledForUnpartitionedTable() exchange(REMOTE, REPARTITION, SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION, node(TableScanNode.class)))))); } + + private SessionBuilder testingSessionBuilder() + { + return Session.builder(getPlanTester().getDefaultSession()); + } } diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 077f753de4a9..98887431cd7f 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -569,6 +569,12 @@ test + + io.trino + trino-tpcds + test + + io.trino trino-tpch diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java index c3868bb37e95..698658916e7e 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java @@ -27,6 +27,7 @@ import io.trino.plugin.iceberg.catalog.rest.TestingPolarisCatalog; import io.trino.plugin.iceberg.containers.NessieContainer; import io.trino.plugin.iceberg.containers.UnityCatalogContainer; +import io.trino.plugin.tpcds.TpcdsPlugin; import io.trino.plugin.tpch.TpchPlugin; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.QueryRunner; @@ -85,6 +86,7 @@ public static class Builder private Optional metastoreDirectory = Optional.empty(); private ImmutableMap.Builder icebergProperties = ImmutableMap.builder(); private Optional schemaInitializer = Optional.of(SchemaInitializer.builder().build()); + private boolean tpcdsCatalogEnabled; protected Builder() { @@ -145,6 +147,12 @@ public Builder disableSchemaInitializer() return self(); } + public Builder setTpcdsCatalogEnabled(boolean tpcdsCatalogEnabled) + { + this.tpcdsCatalogEnabled = tpcdsCatalogEnabled; + return self(); + } + @Override public DistributedQueryRunner build() throws Exception @@ -154,6 +162,11 @@ public DistributedQueryRunner build() queryRunner.installPlugin(new TpchPlugin()); queryRunner.createCatalog("tpch", "tpch"); + if (tpcdsCatalogEnabled) { + queryRunner.installPlugin(new TpcdsPlugin()); + queryRunner.createCatalog("tpcds", "tpcds"); + } + if (!icebergProperties.buildOrThrow().containsKey("fs.hadoop.enabled")) { icebergProperties.put("fs.hadoop.enabled", "true"); } @@ -172,6 +185,13 @@ public DistributedQueryRunner build() } } + private static Builder icebergQueryRunnerMainBuilder() + { + return IcebergQueryRunner.builder() + .addCoordinatorProperty("http-server.http.port", "8080") + .setTpcdsCatalogEnabled(true); + } + public static final class IcebergRestQueryRunnerMain { private IcebergRestQueryRunnerMain() {} @@ -192,8 +212,7 @@ public static void main(String[] args) testServer.start(); @SuppressWarnings("resource") - QueryRunner queryRunner = IcebergQueryRunner.builder() - .addCoordinatorProperty("http-server.http.port", "8080") + QueryRunner queryRunner = icebergQueryRunnerMainBuilder() .setBaseDataDir(Optional.of(warehouseLocation.toPath())) .setIcebergProperties(ImmutableMap.of( "iceberg.catalog.type", "rest", @@ -221,8 +240,7 @@ public static void main(String[] args) TestingPolarisCatalog polarisCatalog = new TestingPolarisCatalog(warehouseLocation.getPath()); @SuppressWarnings("resource") - QueryRunner queryRunner = IcebergQueryRunner.builder() - .addCoordinatorProperty("http-server.http.port", "8080") + QueryRunner queryRunner = icebergQueryRunnerMainBuilder() .setBaseDataDir(Optional.of(warehouseLocation.toPath())) .addIcebergProperty("iceberg.catalog.type", "rest") .addIcebergProperty("iceberg.rest-catalog.uri", polarisCatalog.restUri() + "/api/catalog") @@ -282,8 +300,7 @@ public static void main(String[] args) // Requires AWS credentials, which can be provided any way supported by the DefaultProviderChain // See https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default @SuppressWarnings("resource") - QueryRunner queryRunner = IcebergQueryRunner.builder() - .addCoordinatorProperty("http-server.http.port", "8080") + QueryRunner queryRunner = icebergQueryRunnerMainBuilder() .setIcebergProperties(ImmutableMap.of("iceberg.catalog.type", "glue")) .build(); @@ -349,8 +366,7 @@ public static void main(String[] args) minio.createBucket(bucketName); @SuppressWarnings("resource") - QueryRunner queryRunner = IcebergQueryRunner.builder() - .addCoordinatorProperty("http-server.http.port", "8080") + QueryRunner queryRunner = icebergQueryRunnerMainBuilder() .setIcebergProperties(Map.of( "iceberg.catalog.type", "TESTING_FILE_METASTORE", "hive.metastore.catalog.dir", "s3://%s/".formatted(bucketName), @@ -403,8 +419,7 @@ public static void main(String[] args) hiveHadoop.start(); @SuppressWarnings("resource") - QueryRunner queryRunner = IcebergQueryRunner.builder() - .addCoordinatorProperty("http-server.http.port", "8080") + QueryRunner queryRunner = icebergQueryRunnerMainBuilder() .setIcebergProperties(Map.of( "iceberg.catalog.type", "HIVE_METASTORE", "hive.metastore.uri", hiveHadoop.getHiveMetastoreEndpoint().toString(), @@ -439,8 +454,7 @@ public static void main(String[] args) TestingIcebergJdbcServer server = new TestingIcebergJdbcServer(); @SuppressWarnings("resource") - QueryRunner queryRunner = IcebergQueryRunner.builder() - .addCoordinatorProperty("http-server.http.port", "8080") + QueryRunner queryRunner = icebergQueryRunnerMainBuilder() .setIcebergProperties(ImmutableMap.builder() .put("iceberg.catalog.type", "jdbc") .put("iceberg.jdbc-catalog.driver-class", "org.postgresql.Driver") @@ -467,8 +481,7 @@ public static void main(String[] args) throws Exception { @SuppressWarnings("resource") - QueryRunner queryRunner = IcebergQueryRunner.builder() - .addCoordinatorProperty("http-server.http.port", "8080") + QueryRunner queryRunner = icebergQueryRunnerMainBuilder() .setIcebergProperties(ImmutableMap.builder() .put("iceberg.catalog.type", "snowflake") .put("fs.native-s3.enabled", "true") @@ -535,8 +548,7 @@ public static void main(String[] args) metastoreDir.deleteOnExit(); @SuppressWarnings("resource") - QueryRunner queryRunner = IcebergQueryRunner.builder() - .addCoordinatorProperty("http-server.http.port", "8080") + QueryRunner queryRunner = icebergQueryRunnerMainBuilder() .addIcebergProperty("hive.metastore.catalog.dir", metastoreDir.toURI().toString()) .setInitialTables(TpchTable.getTables()) .build(); @@ -564,8 +576,7 @@ public static void main(String[] args) metastoreDir.deleteOnExit(); @SuppressWarnings("resource") - QueryRunner queryRunner = IcebergQueryRunner.builder() - .addCoordinatorProperty("http-server.http.port", "8080") + QueryRunner queryRunner = icebergQueryRunnerMainBuilder() .addIcebergProperty("hive.metastore.catalog.dir", metastoreDir.toURI().toString()) .setExtraProperties(ImmutableMap.builder() .put("retry-policy", "TASK") diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeCompatibility.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeCompatibility.java index e8aa07208f5d..6bca55c902b7 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeCompatibility.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeCompatibility.java @@ -107,7 +107,7 @@ protected String getConfigurationDirectory(String dockerImage) private String getConfigFileFor(String dockerImage) { if (getVersionFromDockerImageName(dockerImage) < 369) { - return "config-with-system-memory.properties"; + return "config-pre369.properties"; } return "config.properties"; } @@ -115,7 +115,7 @@ private String getConfigFileFor(String dockerImage) private String getHiveConfigFor(String dockerImage) { if (getVersionFromDockerImageName(dockerImage) < 359) { - return "hive-hadoop2.properties"; + return "hive-pre359.properties"; } return "hive.properties"; } @@ -123,7 +123,7 @@ private String getHiveConfigFor(String dockerImage) private String getIcebergConfigFor(String dockerImage) { if (getVersionFromDockerImageName(dockerImage) < 359) { - return "iceberg_old.properties"; + return "iceberg-pre359.properties"; } return "iceberg.properties"; } diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/singlenode-compatibility/config-with-system-memory.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/singlenode-compatibility/config-pre369.properties similarity index 100% rename from testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/singlenode-compatibility/config-with-system-memory.properties rename to testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/singlenode-compatibility/config-pre369.properties diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/singlenode-compatibility/hive-hadoop2.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/singlenode-compatibility/hive-pre359.properties similarity index 100% rename from testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/singlenode-compatibility/hive-hadoop2.properties rename to testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/singlenode-compatibility/hive-pre359.properties diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/singlenode-compatibility/iceberg_old.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/singlenode-compatibility/iceberg-pre359.properties similarity index 100% rename from testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/singlenode-compatibility/iceberg_old.properties rename to testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/singlenode-compatibility/iceberg-pre359.properties diff --git a/testing/trino-tests/src/test/java/io/trino/tests/TestQueryManager.java b/testing/trino-tests/src/test/java/io/trino/tests/TestQueryManager.java index 214e854c6a54..b5178d0692df 100644 --- a/testing/trino-tests/src/test/java/io/trino/tests/TestQueryManager.java +++ b/testing/trino-tests/src/test/java/io/trino/tests/TestQueryManager.java @@ -15,7 +15,6 @@ import io.opentelemetry.api.trace.Span; import io.trino.Session; -import io.trino.client.ClientCapabilities; import io.trino.dispatcher.DispatchManager; import io.trino.execution.QueryInfo; import io.trino.execution.QueryManager; @@ -31,18 +30,14 @@ import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.parallel.Execution; -import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.SessionTestUtils.TEST_SESSION; import static io.trino.execution.QueryRunnerUtil.createQuery; import static io.trino.execution.QueryRunnerUtil.waitForQueryState; import static io.trino.execution.QueryState.FAILED; import static io.trino.execution.QueryState.RUNNING; -import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; import static io.trino.spi.StandardErrorCode.EXCEEDED_CPU_LIMIT; import static io.trino.spi.StandardErrorCode.EXCEEDED_SCAN_LIMIT; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; -import static io.trino.testing.TestingSession.testSessionBuilder; -import static java.util.Arrays.stream; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.fail; import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; @@ -110,7 +105,8 @@ public void testQueryScanExceeded() throws Exception { try (QueryRunner queryRunner = TpchQueryRunner.builder().addExtraProperty("query.max-scan-physical-bytes", "0B").build()) { - QueryId queryId = createQuery(queryRunner, TEST_SESSION, "SELECT * FROM system.runtime.nodes"); + Session session = queryRunner.getDefaultSession(); + QueryId queryId = createQuery(queryRunner, session, "SELECT * FROM system.runtime.nodes"); waitForQueryState(queryRunner, queryId, FAILED); QueryManager queryManager = queryRunner.getCoordinator().getQueryManager(); BasicQueryInfo queryInfo = queryManager.getQueryInfo(queryId); @@ -125,12 +121,7 @@ public void testQueryScanExceededSession() throws Exception { try (QueryRunner queryRunner = TpchQueryRunner.builder().build()) { - Session session = testSessionBuilder() - .setCatalog("tpch") - .setSchema(TINY_SCHEMA_NAME) - .setClientCapabilities(stream(ClientCapabilities.values()) - .map(ClientCapabilities::toString) - .collect(toImmutableSet())) + Session session = Session.builder(queryRunner.getDefaultSession()) .setSystemProperty("query_max_scan_physical_bytes", "0B") .build(); QueryId queryId = createQuery(queryRunner, session, "SELECT * FROM system.runtime.nodes");