diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestRemoveEmptyMergeWriterRuleSet.java b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestRemoveEmptyMergeWriterRuleSet.java new file mode 100644 index 000000000000..e2eb7ed0ff1c --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestRemoveEmptyMergeWriterRuleSet.java @@ -0,0 +1,105 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.sql.planner.iterative.rule; + +import io.trino.spi.connector.CatalogHandle; +import io.trino.spi.connector.SchemaTableName; +import io.trino.sql.planner.Partitioning; +import io.trino.sql.planner.PartitioningScheme; +import io.trino.sql.planner.Symbol; +import io.trino.sql.planner.iterative.Rule; +import io.trino.sql.planner.iterative.rule.test.BaseRuleTest; +import io.trino.sql.planner.iterative.rule.test.PlanBuilder; +import io.trino.sql.planner.plan.Assignments; +import io.trino.sql.planner.plan.ExchangeNode; +import io.trino.sql.planner.plan.PlanNode; +import io.trino.sql.planner.plan.TableFinishNode; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.List; + +import static io.trino.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; +import static io.trino.sql.planner.assertions.PlanMatchPattern.values; +import static io.trino.sql.planner.iterative.rule.RemoveEmptyMergeWriterRuleSet.removeEmptyMergeWriterRule; +import static io.trino.sql.planner.iterative.rule.RemoveEmptyMergeWriterRuleSet.removeEmptyMergeWriterWithExchangeRule; + +public class TestRemoveEmptyMergeWriterRuleSet + extends BaseRuleTest +{ + private CatalogHandle catalogHandle; + private SchemaTableName schemaTableName; + + @BeforeClass + public void setup() + { + catalogHandle = tester().getCurrentCatalogHandle(); + schemaTableName = new SchemaTableName("schema", "table"); + } + + @Test + public void testRemoveEmptyMergeRewrite() + { + testRemoveEmptyMergeRewrite(removeEmptyMergeWriterRule(), false); + } + + @Test + public void testRemoveEmptyMergeRewriteWithExchange() + { + testRemoveEmptyMergeRewrite(removeEmptyMergeWriterWithExchangeRule(), true); + } + + private void testRemoveEmptyMergeRewrite(Rule rule, boolean planWithExchange) + { + tester().assertThat(rule) + .on(p -> { + Symbol mergeRow = p.symbol("merge_row"); + Symbol rowId = p.symbol("row_id"); + Symbol rowCount = p.symbol("row_count"); + + PlanNode merge = p.merge( + schemaTableName, + p.exchange(e -> e + .addSource( + p.project( + Assignments.builder() + .putIdentity(mergeRow) + .putIdentity(rowId) + .putIdentity(rowCount) + .build(), + p.values(mergeRow, rowId, rowCount))) + .addInputsSet(mergeRow, rowId, rowCount) + .partitioningScheme( + new PartitioningScheme( + Partitioning.create(SINGLE_DISTRIBUTION, List.of()), + List.of(mergeRow, rowId, rowCount)))), + mergeRow, + rowId, + List.of(rowCount)); + return p.tableFinish( + planWithExchange ? withExchange(p, merge, rowCount) : merge, + p.createTarget(catalogHandle, schemaTableName, true, true), + rowCount); + }) + .matches(values("A")); + } + + private ExchangeNode withExchange(PlanBuilder planBuilder, PlanNode source, Symbol symbol) + { + return planBuilder.exchange(e -> e + .addSource(source) + .addInputsSet(symbol) + .partitioningScheme(new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, List.of()), List.of(symbol)))); + } +} diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java index 05da7c5a291c..2dfe735377a7 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java @@ -685,10 +685,20 @@ public TableScanNode build() } } - public TableFinishNode tableWithExchangeCreate(WriterTarget target, PlanNode source, Symbol rowCountSymbol, PartitioningScheme partitioningScheme) + public TableFinishNode tableFinish(PlanNode source, WriterTarget target, Symbol rowCountSymbol) { return new TableFinishNode( idAllocator.getNextId(), + source, + target, + rowCountSymbol, + Optional.empty(), + Optional.empty()); + } + + public TableFinishNode tableWithExchangeCreate(WriterTarget target, PlanNode source, Symbol rowCountSymbol) + { + return tableFinish( exchange(e -> e .addSource(tableWriter( ImmutableList.of(rowCountSymbol), @@ -699,11 +709,9 @@ public TableFinishNode tableWithExchangeCreate(WriterTarget target, PlanNode sou source, rowCountSymbol)) .addInputsSet(rowCountSymbol) - .partitioningScheme(partitioningScheme)), + .partitioningScheme(new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(rowCountSymbol)))), target, - rowCountSymbol, - Optional.empty(), - Optional.empty()); + rowCountSymbol); } public CreateTarget createTarget(CatalogHandle catalogHandle, SchemaTableName schemaTableName, boolean reportingWrittenBytesSupported, boolean multipleWritersPerPartitionSupported, OptionalInt maxWriterTasks) diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java b/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java index e56002badcde..1bd4c6f225c6 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java @@ -124,8 +124,7 @@ public void testScaledWritersUsedAndTargetSupportsIt(PartitioningHandle scaledWr .source(planBuilder.tableWithExchangeCreate( planBuilder.createTarget(catalogSupportingScaledWriters, schemaTableName, true, true), tableWriterSource, - symbol, - new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))))); + symbol))); validatePlan(root); } @@ -146,8 +145,7 @@ public void testScaledWritersUsedAndTargetDoesNotSupportReportingWrittenBytes(Pa .source(planBuilder.tableWithExchangeCreate( planBuilder.createTarget(catalogNotSupportingScaledWriters, schemaTableName, false, true), tableWriterSource, - symbol, - new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))))); + symbol))); assertThatThrownBy(() -> validatePlan(root)) .isInstanceOf(IllegalStateException.class) .hasMessage("The scaled writer partitioning scheme is set but writer target no_bytes_written_reported:INSTANCE doesn't support reporting physical written bytes"); @@ -176,8 +174,7 @@ public void testScaledWritersWithMultipleSourceExchangesAndTargetDoesNotSupportR .source(planBuilder.tableWithExchangeCreate( planBuilder.createTarget(catalogNotSupportingScaledWriters, schemaTableName, false, true), tableWriterSource, - symbol, - new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))))); + symbol))); assertThatThrownBy(() -> validatePlan(root)) .isInstanceOf(IllegalStateException.class) .hasMessage("The scaled writer partitioning scheme is set but writer target no_bytes_written_reported:INSTANCE doesn't support reporting physical written bytes"); @@ -206,8 +203,7 @@ public void testScaledWritersWithMultipleSourceExchangesAndTargetSupportIt(Parti .source(planBuilder.tableWithExchangeCreate( planBuilder.createTarget(catalogSupportingScaledWriters, schemaTableName, true, true), tableWriterSource, - symbol, - new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))))); + symbol))); validatePlan(root); } @@ -228,8 +224,7 @@ public void testScaledWritersUsedAboveTableWriterInThePlanTree(PartitioningHandl .source(planBuilder.tableWithExchangeCreate( planBuilder.createTarget(catalogNotSupportingScaledWriters, schemaTableName, false, true), tableWriterSource, - symbol, - new PartitioningScheme(Partitioning.create(scaledWriterPartitionHandle, ImmutableList.of()), ImmutableList.of(symbol))))); + symbol))); validatePlan(root); } @@ -257,8 +252,7 @@ public void testScaledWritersTwoTableWritersNodes(PartitioningHandle scaledWrite .source(planBuilder.tableWithExchangeCreate( planBuilder.createTarget(catalogNotSupportingScaledWriters, schemaTableName, false, true), tableWriterSource, - symbol, - new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))))); + symbol))); assertThatThrownBy(() -> validatePlan(root)) .isInstanceOf(IllegalStateException.class) .hasMessage("The scaled writer partitioning scheme is set but writer target no_bytes_written_reported:INSTANCE doesn't support reporting physical written bytes"); @@ -281,8 +275,7 @@ public void testScaledWriterUsedAndTargetDoesNotSupportMultipleWritersPerPartiti .source(planBuilder.tableWithExchangeCreate( planBuilder.createTarget(catalogNotSupportingScaledWriters, schemaTableName, true, false), tableWriterSource, - symbol, - new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))))); + symbol))); if (scaledWriterPartitionHandle == SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION) { validatePlan(root); @@ -317,8 +310,7 @@ public void testScaledWriterWithMultipleSourceExchangesAndTargetDoesNotSupportMu .source(planBuilder.tableWithExchangeCreate( planBuilder.createTarget(catalogNotSupportingScaledWriters, schemaTableName, true, false), tableWriterSource, - symbol, - new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))))); + symbol))); if (scaledWriterPartitionHandle == SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION) { validatePlan(root);