diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java index b575f0916a09b..af74bed5c00be 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java @@ -498,9 +498,16 @@ public static void validateAndApplyMetadata( .collect(Collectors.toList()); final DataType producedDataType = TypeConversions.fromLogicalToDataType(createProducedType(schema, source)); - sourceAbilities.add( - new ReadingMetadataSpec(metadataKeys, (RowType) producedDataType.getLogicalType())); + + // Apply metadata setting to source (FLINK-23911) metadataSource.applyReadableMetadata(metadataKeys, producedDataType); + + // Only add ReadingMetadataSpec if non-empty + if (!metadataKeys.isEmpty()) { + sourceAbilities.add( + new ReadingMetadataSpec( + metadataKeys, (RowType) producedDataType.getLogicalType())); + } } private static void validateScanSource( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java index 8da14bb06f25f..b7932cf654180 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java @@ -35,6 +35,7 @@ import org.apache.flink.table.planner.plan.rules.logical.PushProjectIntoTableSourceScanRule; import org.apache.flink.table.planner.plan.schema.TableSourceTable; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rex.RexBuilder; @@ -42,6 +43,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -212,6 +214,15 @@ public List reuseDuplicatedScan(List relNodes) { ScanTableSource newTableSource = tableSourceSpec.getScanTableSource(flinkContext, flinkTypeFactory); + // FLINK-23911: Ensure source is told "read zero metadata" even when + // no ReadingMetadataSpec was added (FLINK-38569) + if (newTableSource instanceof SupportsReadingMetadata && allMetaKeys.isEmpty()) { + ((SupportsReadingMetadata) newTableSource) + .applyReadableMetadata( + Collections.emptyList(), + TypeConversions.fromLogicalToDataType(newSourceType)); + } + TableSourceTable newSourceTable = pickTable.replace( newTableSource, @@ -278,7 +289,8 @@ private static RowType applyPhysicalAndMetadataPushDown( sourceAbilitySpecs.add( new ProjectPushDownSpec(projectedPhysicalFields, newProducedType)); } - if (supportsReadingMeta) { + + if (supportsReadingMeta && !usedMetadataNames.isEmpty()) { sourceAbilitySpecs.add(new ReadingMetadataSpec(usedMetadataNames, newProducedType)); } return newProducedType; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java index 4c3ae91a512dd..cd48382be6aa7 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java @@ -40,6 +40,7 @@ import org.apache.flink.table.planner.plan.utils.NestedSchema; import org.apache.flink.table.planner.plan.utils.RexNodeExtractor; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; @@ -86,6 +87,20 @@ public class PushProjectIntoTableSourceScanRule extends RelRule { + /** + * Result of performPushDown containing the new type and metadata keys for + * FLINK-23911/FLINK-38569. + */ + private static class PushDownResult { + final RowType newProducedType; + final List projectedMetadataKeys; + + PushDownResult(RowType newProducedType, List projectedMetadataKeys) { + this.newProducedType = newProducedType; + this.projectedMetadataKeys = projectedMetadataKeys; + } + } + public static final PushProjectIntoTableSourceScanRule INSTANCE = new PushProjectIntoTableSourceScanRule( PushProjectIntoTableSourceScanRule.Config.DEFAULT); @@ -168,13 +183,23 @@ public void onMatch(RelOptRuleCall call) { } final List abilitySpecs = new ArrayList<>(); - final RowType newProducedType = + final PushDownResult result = performPushDown(sourceTable, projectedSchema, producedType, abilitySpecs); + final RowType newProducedType = result.newProducedType; final DynamicTableSource newTableSource = sourceTable.tableSource().copy(); final SourceAbilityContext context = SourceAbilityContext.from(scan); abilitySpecs.forEach(spec -> spec.apply(newTableSource, context)); + // FLINK-23911: Ensure copied source is told "read zero metadata" even when + // no ReadingMetadataSpec was added (FLINK-38569) + if (supportsMetadata(sourceTable.tableSource()) && result.projectedMetadataKeys.isEmpty()) { + ((SupportsReadingMetadata) newTableSource) + .applyReadableMetadata( + Collections.emptyList(), + TypeConversions.fromLogicalToDataType(newProducedType)); + } + final RelDataType newRowType = typeFactory.buildRelNodeRowType(newProducedType); final TableSourceTable newSource = sourceTable.copy( @@ -260,7 +285,7 @@ private List getPrimaryKeyProjections(LogicalTableScan scan) { .collect(Collectors.toList()); } - private RowType performPushDown( + private PushDownResult performPushDown( TableSourceTable source, NestedSchema projectedSchema, RowType producedType, @@ -328,9 +353,10 @@ private RowType performPushDown( final RowType newProducedType = (RowType) Projection.of(projectedFields).project(producedType); + List projectedMetadataKeys = Collections.emptyList(); if (supportsMetadata(source.tableSource())) { // Use the projected column name to get the metadata key - final List projectedMetadataKeys = + projectedMetadataKeys = projectedMetadataColumns.stream() .map( nestedColumn -> @@ -348,10 +374,12 @@ private RowType performPushDown( .map(col -> col.getMetadataKey().orElse(col.getName())) .collect(Collectors.toList()); - abilitySpecs.add(new ReadingMetadataSpec(projectedMetadataKeys, newProducedType)); + if (!projectedMetadataKeys.isEmpty()) { + abilitySpecs.add(new ReadingMetadataSpec(projectedMetadataKeys, newProducedType)); + } } - return newProducedType; + return new PushDownResult(newProducedType, projectedMetadataKeys); } private List rewriteProjections( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java index d433ba5ed9df0..dc574849ff559 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java @@ -113,8 +113,6 @@ public class DeltaJoinUtil { FilterPushDownSpec.class, ProjectPushDownSpec.class, PartitionPushDownSpec.class, - // TODO FLINK-38569 ReadingMetadataSpec should not be generated when there are - // no metadata keys to be read ReadingMetadataSpec.class); private DeltaJoinUtil() {} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml index 92ec23fe38c73..4abfd8e0f3d82 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml @@ -67,7 +67,7 @@ LogicalProject(id=[$0], nestedName=[$1.nested1.name], nestedSum=[+($3..value, $3 @@ -207,7 +207,7 @@ LogicalProject(id=[$0], nestedName=[$1.nested1.name], nestedValue=[$2.value], ne @@ -224,7 +224,7 @@ LogicalProject(EXPR$0=[ITEM($2.Mid.data_arr, $0).value], EXPR$1=[ITEM($2.Mid.dat @@ -241,7 +241,7 @@ LogicalProject(EXPR$0=[ITEM($2.Mid.data_arr, 2).value], EXPR$1=[ITEM($2.Mid.data @@ -258,7 +258,7 @@ LogicalProject(EXPR$0=[ITEM($2.Mid.data_arr, 2).value], data_arr=[$2.Mid.data_ar @@ -275,7 +275,7 @@ LogicalProject(EXPR$0=[CAST(ITEM(CAST($5.result):RecordType:peek_no_expand(Recor @@ -292,7 +292,7 @@ LogicalProject(EXPR$0=[ITEM($2.data_arr, $0).value], EXPR$1=[ITEM($2.data_map, _ @@ -329,7 +329,7 @@ LogicalProject(id=[$0], EXPR$1=[ITEM($5, _UTF-16LE'e')]) @@ -346,7 +346,7 @@ LogicalProject(a=[$0], EXPR$1=[TRIM(FLAG(BOTH), _UTF-16LE' ', $2)]) @@ -433,7 +433,7 @@ LogicalProject(a=[$0], c=[$2]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml index cda6b9dbcda09..edb722baefe15 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml @@ -168,7 +168,7 @@ LogicalSink(table=[default_catalog.default_database.tmp_snk], fields=[a0, a1]) @@ -310,7 +310,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[1],[4],[5 +- Calc(select=[a0, a1, null:VARCHAR(2147483647) AS EXPR$2, null:INTEGER AS EXPR$3, b0, b2, b1]) +- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a2, a1, b1, b2, b0]) :- Exchange(distribution=[hash[a1, a2]]) - : +- TableSourceScan(table=[[default_catalog, default_database, src1, filter=[>(a0, 1)], project=[a0, a2, a1], metadata=[]]], fields=[a0, a2, a1]) + : +- TableSourceScan(table=[[default_catalog, default_database, src1, filter=[>(a0, 1)], project=[a0, a2, a1]]], fields=[a0, a2, a1]) +- Exchange(distribution=[hash[b1, b2]]) +- Calc(select=[b1, b2, b0], where=[<(b1, 10)]) +- TableSourceScan(table=[[default_catalog, default_database, src2, filter=[<>(b0, 0)]]], fields=[b0, b2, b1]) @@ -838,7 +838,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[1],[4],[5 +- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a2, a1, b0, b2, b1], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a1, a2]]) : +- Calc(select=[a0, a2, a1], where=[>(a0, RAND(10))]) - : +- TableSourceScan(table=[[default_catalog, default_database, src1, filter=[], project=[a0, a1, a2], metadata=[]]], fields=[a0, a1, a2]) + : +- TableSourceScan(table=[[default_catalog, default_database, src1, filter=[], project=[a0, a1, a2]]], fields=[a0, a1, a2]) +- Exchange(distribution=[hash[b1, b2]]) +- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1]) ]]> @@ -908,7 +908,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[1],[4],[5 +- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a2, a1, b0, b2, b1], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a1, a2]]) : +- Calc(select=[a0, a2, a1], where=[>(a0, RAND(10))]) - : +- TableSourceScan(table=[[default_catalog, default_database, src1, filter=[], project=[a0, a1, a2], metadata=[]]], fields=[a0, a1, a2]) + : +- TableSourceScan(table=[[default_catalog, default_database, src1, filter=[], project=[a0, a1, a2]]], fields=[a0, a1, a2]) +- Exchange(distribution=[hash[b1, b2]]) +- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1]) ]]> @@ -1046,7 +1046,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[4],[5]], +- Calc(select=[a0, null:DOUBLE AS EXPR$1, null:VARCHAR(2147483647) AS EXPR$2, null:INTEGER AS EXPR$3, b0, b2, null:DOUBLE AS EXPR$6]) +- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a2, a1, b0, b2, b1]) :- Exchange(distribution=[hash[a1, a2]]) - : +- TableSourceScan(table=[[default_catalog, default_database, src1WithPartition, partitions=[{pt=1}], project=[a0, a2, a1], metadata=[]]], fields=[a0, a2, a1]) + : +- TableSourceScan(table=[[default_catalog, default_database, src1WithPartition, partitions=[{pt=1}], project=[a0, a2, a1]]], fields=[a0, a2, a1]) +- Exchange(distribution=[hash[b1, b2]]) +- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1]) ]]> @@ -1097,7 +1097,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[1],[2],[4 +- Calc(select=[a0, a1, a2, null:INTEGER AS EXPR$3, b0, b2, b1]) +- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a1, a2, b0, b2, b1]) :- Exchange(distribution=[hash[a1, a2]]) - : +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a0, a1, a2], metadata=[]]], fields=[a0, a1, a2]) + : +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a0, a1, a2]]], fields=[a0, a1, a2]) +- Exchange(distribution=[hash[b1, b2]]) +- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1]) ]]> @@ -1123,7 +1123,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[1],[2],[0],[4 +- Calc(select=[a0 AS a2, a1 AS a0, a2 AS a1, null:INTEGER AS EXPR$3, b0, b2, b1]) +- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a1, a2, a0, b0, b2, b1]) :- Exchange(distribution=[hash[a1, a2]]) - : +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a1, a2, a0], metadata=[]]], fields=[a1, a2, a0]) + : +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a1, a2, a0]]], fields=[a1, a2, a0]) +- Exchange(distribution=[hash[b1, b2]]) +- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1]) ]]> @@ -1150,7 +1150,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[1],[2],[4 +- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a1, a2, b0, b2, b1], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a1, a2]]) : +- Calc(select=[a0, a1, SUBSTRING(a2, 2) AS a2]) - : +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a0, a1, a2], metadata=[]]], fields=[a0, a1, a2]) + : +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a0, a1, a2]]], fields=[a0, a1, a2]) +- Exchange(distribution=[hash[b1, b2]]) +- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1]) ]]> @@ -1177,7 +1177,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[1],[2],[4 +- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a1, a2, b0, b2, b1]) :- Exchange(distribution=[hash[a1, a2]]) : +- Calc(select=[a0, a1, SUBSTRING(a2, 2) AS a2]) - : +- TableSourceScan(table=[[default_catalog, default_database, src1WithMultiIndexes, project=[a0, a1, a2], metadata=[]]], fields=[a0, a1, a2]) + : +- TableSourceScan(table=[[default_catalog, default_database, src1WithMultiIndexes, project=[a0, a1, a2]]], fields=[a0, a1, a2]) +- Exchange(distribution=[hash[b1, b2]]) +- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1]) ]]>