diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java index d02aba0d64c83..84ce89bfb668c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java @@ -177,16 +177,6 @@ private String getDigest(Sink sink) { List digest = new ArrayList<>(); digest.add(sink.contextResolvedTable().getIdentifier().asSummaryString()); - int[][] targetColumns = sink.targetColumns(); - if (targetColumns != null && targetColumns.length > 0) { - digest.add( - "targetColumns=[" - + Arrays.stream(targetColumns) - .map(Arrays::toString) - .collect(Collectors.joining(",")) - + "]"); - } - String fieldTypes = sink.getRowType().getFieldList().stream() .map(f -> f.getType().toString()) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala index 8279be977ca5f..74f8893dc0e50 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala @@ -80,7 +80,9 @@ abstract class Sink( .getOrElse(Array.empty[Array[Int]]) .map(_.mkString("[", ",", "]")) .mkString(","), - targetColumns != null + // only print target columns when the sink supports TargetColumnWriting + targetColumns != null && abilitySpecs.exists( + spec => spec.isInstanceOf[TargetColumnWritingSpec]) ) .item("fields", getRowType.getFieldNames.mkString(", ")) .itemIf("hints", RelExplainUtil.hintsToString(hints), !hints.isEmpty) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java index 7101e129918e8..b29f1620ae7f2 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java @@ -43,6 +43,7 @@ import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown; import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate; +import org.apache.flink.table.connector.sink.abilities.SupportsTargetColumnWriting; import org.apache.flink.table.connector.sink.abilities.SupportsTruncate; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; @@ -323,7 +324,7 @@ private static class TestScanContext implements RowLevelModificationScanContext /** A sink that supports row-level update. */ private static class SupportsRowLevelUpdateSink - implements DynamicTableSink, SupportsRowLevelUpdate { + implements DynamicTableSink, SupportsRowLevelUpdate, SupportsTargetColumnWriting { protected final ObjectIdentifier tableIdentifier; protected final ResolvedCatalogTable resolvedCatalogTable; @@ -447,6 +448,12 @@ public RowLevelUpdateMode getRowLevelUpdateMode() { } }; } + + @Override + public boolean applyTargetColumns(int[][] targetColumns) { + // Implement SupportsTargetColumnWriting for the compatibility of existing test cases + return true; + } } /** A sink that supports row-level delete/update. */ diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/SinkReuseTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/SinkReuseTestBase.java index c9ba3a205a992..37725a72d6c45 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/SinkReuseTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/SinkReuseTestBase.java @@ -20,7 +20,6 @@ import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.flink.table.planner.plan.reuse.SinkReuser; import org.apache.flink.table.planner.utils.TableTestBase; import org.apache.flink.table.planner.utils.TableTestUtil; @@ -35,8 +34,6 @@ public abstract class SinkReuseTestBase extends TableTestBase { @BeforeEach protected void setup() { TableConfig tableConfig = TableConfig.getDefault(); - tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true); - tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SINK_ENABLED, true); util = getTableTestUtil(tableConfig); util.tableEnv() @@ -153,14 +150,25 @@ public void testSinkReuseFromSameSource() { } @Test - public void testSinkReuseWithPartialColumns() { + public void testSinkReuseWithPartialColumnsNotSupportsTargetColumnWriting() { StatementSet statementSet = util.tableEnv().createStatementSet(); + // sink1 has not implemented the SupportsTargetColumnWriting sink ability statementSet.addInsertSql("INSERT INTO sink1(`x`) (SELECT x FROM source1)"); statementSet.addInsertSql("INSERT INTO sink1(`y`) (SELECT y FROM source1)"); statementSet.addInsertSql("INSERT INTO sink1(`x`) (SELECT x FROM source3)"); util.verifyExecPlan(statementSet); } + @Test + public void testSinkReuseWithPartialColumnsAndSupportsTargetColumnWriting() { + StatementSet statementSet = util.tableEnv().createStatementSet(); + // sink2 has implemented the SupportsTargetColumnWriting sink ability + statementSet.addInsertSql("INSERT INTO sink2(`x`) (SELECT x FROM source1)"); + statementSet.addInsertSql("INSERT INTO sink2(`y`) (SELECT y FROM source1)"); + statementSet.addInsertSql("INSERT INTO sink2(`x`) (SELECT x FROM source3)"); + util.verifyExecPlan(statementSet); + } + @Test public void testSinkReuseWithOverwrite() { StatementSet statementSet = util.tableEnv().createStatementSet(); diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.xml index f3c33920f699d..f37bea76cc878 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.xml @@ -148,18 +148,47 @@ Sink(table=[default_catalog.default_database.sink1], fields=[x, y], hints=[[[OPT ]]> - + + + + + + + + + @@ -168,14 +197,14 @@ LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/StreamSinkReuseTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/StreamSinkReuseTest.xml index 59198a2e151a9..270ccc571c2b5 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/StreamSinkReuseTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/StreamSinkReuseTest.xml @@ -148,18 +148,47 @@ Sink(table=[default_catalog.default_database.sink1], fields=[x, y]) ]]> - + + + + + + + + + @@ -168,14 +197,14 @@ LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[0]],