Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,16 @@ public static void validateAndApplyMetadata(
.collect(Collectors.toList());
final DataType producedDataType =
TypeConversions.fromLogicalToDataType(createProducedType(schema, source));
sourceAbilities.add(
new ReadingMetadataSpec(metadataKeys, (RowType) producedDataType.getLogicalType()));

// Apply metadata setting to source (FLINK-23911)
metadataSource.applyReadableMetadata(metadataKeys, producedDataType);

// Only add ReadingMetadataSpec if non-empty
if (!metadataKeys.isEmpty()) {
sourceAbilities.add(
new ReadingMetadataSpec(
metadataKeys, (RowType) producedDataType.getLogicalType()));
}
}

private static void validateScanSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@
import org.apache.flink.table.planner.plan.rules.logical.PushProjectIntoTableSourceScanRule;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexProgramBuilder;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -212,6 +214,15 @@ public List<RelNode> reuseDuplicatedScan(List<RelNode> relNodes) {
ScanTableSource newTableSource =
tableSourceSpec.getScanTableSource(flinkContext, flinkTypeFactory);

// FLINK-23911: Ensure source is told "read zero metadata" even when
// no ReadingMetadataSpec was added (FLINK-38569)
if (newTableSource instanceof SupportsReadingMetadata && allMetaKeys.isEmpty()) {
((SupportsReadingMetadata) newTableSource)
.applyReadableMetadata(
Collections.emptyList(),
TypeConversions.fromLogicalToDataType(newSourceType));
}

TableSourceTable newSourceTable =
pickTable.replace(
newTableSource,
Expand Down Expand Up @@ -278,7 +289,8 @@ private static RowType applyPhysicalAndMetadataPushDown(
sourceAbilitySpecs.add(
new ProjectPushDownSpec(projectedPhysicalFields, newProducedType));
}
if (supportsReadingMeta) {

if (supportsReadingMeta && !usedMetadataNames.isEmpty()) {
sourceAbilitySpecs.add(new ReadingMetadataSpec(usedMetadataNames, newProducedType));
}
return newProducedType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.table.planner.plan.utils.NestedSchema;
import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;

import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
Expand Down Expand Up @@ -86,6 +87,20 @@
public class PushProjectIntoTableSourceScanRule
extends RelRule<PushProjectIntoTableSourceScanRule.Config> {

/**
* Result of performPushDown containing the new type and metadata keys for
* FLINK-23911/FLINK-38569.
*/
private static class PushDownResult {
final RowType newProducedType;
final List<String> projectedMetadataKeys;

PushDownResult(RowType newProducedType, List<String> projectedMetadataKeys) {
this.newProducedType = newProducedType;
this.projectedMetadataKeys = projectedMetadataKeys;
}
}

public static final PushProjectIntoTableSourceScanRule INSTANCE =
new PushProjectIntoTableSourceScanRule(
PushProjectIntoTableSourceScanRule.Config.DEFAULT);
Expand Down Expand Up @@ -168,13 +183,23 @@ public void onMatch(RelOptRuleCall call) {
}

final List<SourceAbilitySpec> abilitySpecs = new ArrayList<>();
final RowType newProducedType =
final PushDownResult result =
performPushDown(sourceTable, projectedSchema, producedType, abilitySpecs);
final RowType newProducedType = result.newProducedType;

final DynamicTableSource newTableSource = sourceTable.tableSource().copy();
final SourceAbilityContext context = SourceAbilityContext.from(scan);
abilitySpecs.forEach(spec -> spec.apply(newTableSource, context));

// FLINK-23911: Ensure copied source is told "read zero metadata" even when
// no ReadingMetadataSpec was added (FLINK-38569)
if (supportsMetadata(sourceTable.tableSource()) && result.projectedMetadataKeys.isEmpty()) {
((SupportsReadingMetadata) newTableSource)
.applyReadableMetadata(
Collections.emptyList(),
TypeConversions.fromLogicalToDataType(newProducedType));
}

final RelDataType newRowType = typeFactory.buildRelNodeRowType(newProducedType);
final TableSourceTable newSource =
sourceTable.copy(
Expand Down Expand Up @@ -260,7 +285,7 @@ private List<RexNode> getPrimaryKeyProjections(LogicalTableScan scan) {
.collect(Collectors.toList());
}

private RowType performPushDown(
private PushDownResult performPushDown(
TableSourceTable source,
NestedSchema projectedSchema,
RowType producedType,
Expand Down Expand Up @@ -328,9 +353,10 @@ private RowType performPushDown(
final RowType newProducedType =
(RowType) Projection.of(projectedFields).project(producedType);

List<String> projectedMetadataKeys = Collections.emptyList();
if (supportsMetadata(source.tableSource())) {
// Use the projected column name to get the metadata key
final List<String> projectedMetadataKeys =
projectedMetadataKeys =
projectedMetadataColumns.stream()
.map(
nestedColumn ->
Expand All @@ -348,10 +374,12 @@ private RowType performPushDown(
.map(col -> col.getMetadataKey().orElse(col.getName()))
.collect(Collectors.toList());

abilitySpecs.add(new ReadingMetadataSpec(projectedMetadataKeys, newProducedType));
if (!projectedMetadataKeys.isEmpty()) {
abilitySpecs.add(new ReadingMetadataSpec(projectedMetadataKeys, newProducedType));
}
}

return newProducedType;
return new PushDownResult(newProducedType, projectedMetadataKeys);
}

private List<RexNode> rewriteProjections(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ public class DeltaJoinUtil {
FilterPushDownSpec.class,
ProjectPushDownSpec.class,
PartitionPushDownSpec.class,
// TODO FLINK-38569 ReadingMetadataSpec should not be generated when there are
// no metadata keys to be read
ReadingMetadataSpec.class);

private DeltaJoinUtil() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ LogicalProject(id=[$0], nestedName=[$1.nested1.name], nestedSum=[+($3..value, $3
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(id=[$0], nestedName=[$1], nestedSum=[+($2, $3)])
+- LogicalTableScan(table=[[default_catalog, default_database, NestedTable, project=[id, deepNested_nested1_name, deepNestedWith._.value, deepNestedWith._nested_.value], metadata=[]]])
+- LogicalTableScan(table=[[default_catalog, default_database, NestedTable, project=[id, deepNested_nested1_name, deepNestedWith._.value, deepNestedWith._nested_.value]]])
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -207,7 +207,7 @@ LogicalProject(id=[$0], nestedName=[$1.nested1.name], nestedValue=[$2.value], ne
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(id=[$0], nestedName=[$1], nestedValue=[$4], nestedFlag=[$2], nestedNum=[$3])
+- LogicalTableScan(table=[[default_catalog, default_database, NestedTable, project=[id, deepNested_nested1_name, deepNested_nested2_flag, deepNested_nested2_num, nested_value], metadata=[]]])
+- LogicalTableScan(table=[[default_catalog, default_database, NestedTable, project=[id, deepNested_nested1_name, deepNested_nested2_flag, deepNested_nested2_num, nested_value]]])
]]>
</Resource>
</TestCase>
Expand All @@ -224,7 +224,7 @@ LogicalProject(EXPR$0=[ITEM($2.Mid.data_arr, $0).value], EXPR$1=[ITEM($2.Mid.dat
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(EXPR$0=[ITEM($0, $2).value], EXPR$1=[ITEM($1, _UTF-16LE'item').value])
+- LogicalTableScan(table=[[default_catalog, default_database, NestedItemTable, project=[Result_Mid_data_arr, Result_Mid_data_map, ID], metadata=[]]])
+- LogicalTableScan(table=[[default_catalog, default_database, NestedItemTable, project=[Result_Mid_data_arr, Result_Mid_data_map, ID]]])
]]>
</Resource>
</TestCase>
Expand All @@ -241,7 +241,7 @@ LogicalProject(EXPR$0=[ITEM($2.Mid.data_arr, 2).value], EXPR$1=[ITEM($2.Mid.data
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(EXPR$0=[ITEM($0.data_arr, 2).value], EXPR$1=[ITEM($0.data_arr, $1).value], EXPR$2=[ITEM($0.data_map, _UTF-16LE'item').value], Mid=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database, NestedItemTable, project=[Result_Mid, ID], metadata=[]]])
+- LogicalTableScan(table=[[default_catalog, default_database, NestedItemTable, project=[Result_Mid, ID]]])
]]>
</Resource>
</TestCase>
Expand All @@ -258,7 +258,7 @@ LogicalProject(EXPR$0=[ITEM($2.Mid.data_arr, 2).value], data_arr=[$2.Mid.data_ar
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(EXPR$0=[ITEM($0, 2).value], data_arr=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database, NestedItemTable, project=[Result_Mid_data_arr], metadata=[]]])
+- LogicalTableScan(table=[[default_catalog, default_database, NestedItemTable, project=[Result_Mid_data_arr]]])
]]>
</Resource>
</TestCase>
Expand All @@ -275,7 +275,7 @@ LogicalProject(EXPR$0=[CAST(ITEM(CAST($5.result):RecordType:peek_no_expand(Recor
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(EXPR$0=[CAST(ITEM(CAST($0.result):RecordType:peek_no_expand(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL symbol) NOT NULL meta) NOT NULL ARRAY, 1).meta.symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
+- LogicalTableScan(table=[[default_catalog, default_database, ItemTable, project=[chart], metadata=[]]])
+- LogicalTableScan(table=[[default_catalog, default_database, ItemTable, project=[chart]]])
]]>
</Resource>
</TestCase>
Expand All @@ -292,7 +292,7 @@ LogicalProject(EXPR$0=[ITEM($2.data_arr, $0).value], EXPR$1=[ITEM($2.data_map, _
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(EXPR$0=[ITEM($0.data_arr, $1).value], EXPR$1=[ITEM($0.data_map, _UTF-16LE'item').value], EXPR$2=[ITEM($2, 1)], EXPR$3=[ITEM($2, $1)], EXPR$4=[ITEM($3, _UTF-16LE'item')])
+- LogicalTableScan(table=[[default_catalog, default_database, ItemTable, project=[Result, ID, outer_array, outer_map], metadata=[]]])
+- LogicalTableScan(table=[[default_catalog, default_database, ItemTable, project=[Result, ID, outer_array, outer_map]]])
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -329,7 +329,7 @@ LogicalProject(id=[$0], EXPR$1=[ITEM($5, _UTF-16LE'e')])
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(id=[$0], EXPR$1=[ITEM($1, _UTF-16LE'e')])
+- LogicalTableScan(table=[[default_catalog, default_database, NestedTable, project=[id, testMap], metadata=[]]])
+- LogicalTableScan(table=[[default_catalog, default_database, NestedTable, project=[id, testMap]]])
]]>
</Resource>
</TestCase>
Expand All @@ -346,7 +346,7 @@ LogicalProject(a=[$0], EXPR$1=[TRIM(FLAG(BOTH), _UTF-16LE' ', $2)])
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(a=[$0], EXPR$1=[TRIM(FLAG(BOTH), _UTF-16LE' ', $1)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, project=[a, c], metadata=[]]])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, project=[a, c]]])
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -433,7 +433,7 @@ LogicalProject(a=[$0], c=[$2])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
LogicalTableScan(table=[[default_catalog, default_database, MyTable, project=[a, c], metadata=[]]])
LogicalTableScan(table=[[default_catalog, default_database, MyTable, project=[a, c]]])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ LogicalSink(table=[default_catalog.default_database.tmp_snk], fields=[a0, a1])
<Resource name="optimized rel plan">
<![CDATA[
Sink(table=[default_catalog.default_database.tmp_snk], fields=[a0, a1])
+- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a0, a1], metadata=[]]], fields=[a0, a1])
+- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a0, a1]]], fields=[a0, a1])
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -310,7 +310,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[1],[4],[5
+- Calc(select=[a0, a1, null:VARCHAR(2147483647) AS EXPR$2, null:INTEGER AS EXPR$3, b0, b2, b1])
+- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a2, a1, b1, b2, b0])
:- Exchange(distribution=[hash[a1, a2]])
: +- TableSourceScan(table=[[default_catalog, default_database, src1, filter=[>(a0, 1)], project=[a0, a2, a1], metadata=[]]], fields=[a0, a2, a1])
: +- TableSourceScan(table=[[default_catalog, default_database, src1, filter=[>(a0, 1)], project=[a0, a2, a1]]], fields=[a0, a2, a1])
+- Exchange(distribution=[hash[b1, b2]])
+- Calc(select=[b1, b2, b0], where=[<(b1, 10)])
+- TableSourceScan(table=[[default_catalog, default_database, src2, filter=[<>(b0, 0)]]], fields=[b0, b2, b1])
Expand Down Expand Up @@ -838,7 +838,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[1],[4],[5
+- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a2, a1, b0, b2, b1], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a1, a2]])
: +- Calc(select=[a0, a2, a1], where=[>(a0, RAND(10))])
: +- TableSourceScan(table=[[default_catalog, default_database, src1, filter=[], project=[a0, a1, a2], metadata=[]]], fields=[a0, a1, a2])
: +- TableSourceScan(table=[[default_catalog, default_database, src1, filter=[], project=[a0, a1, a2]]], fields=[a0, a1, a2])
+- Exchange(distribution=[hash[b1, b2]])
+- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1])
]]>
Expand Down Expand Up @@ -908,7 +908,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[1],[4],[5
+- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a2, a1, b0, b2, b1], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a1, a2]])
: +- Calc(select=[a0, a2, a1], where=[>(a0, RAND(10))])
: +- TableSourceScan(table=[[default_catalog, default_database, src1, filter=[], project=[a0, a1, a2], metadata=[]]], fields=[a0, a1, a2])
: +- TableSourceScan(table=[[default_catalog, default_database, src1, filter=[], project=[a0, a1, a2]]], fields=[a0, a1, a2])
+- Exchange(distribution=[hash[b1, b2]])
+- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1])
]]>
Expand Down Expand Up @@ -1046,7 +1046,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[4],[5]],
+- Calc(select=[a0, null:DOUBLE AS EXPR$1, null:VARCHAR(2147483647) AS EXPR$2, null:INTEGER AS EXPR$3, b0, b2, null:DOUBLE AS EXPR$6])
+- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a2, a1, b0, b2, b1])
:- Exchange(distribution=[hash[a1, a2]])
: +- TableSourceScan(table=[[default_catalog, default_database, src1WithPartition, partitions=[{pt=1}], project=[a0, a2, a1], metadata=[]]], fields=[a0, a2, a1])
: +- TableSourceScan(table=[[default_catalog, default_database, src1WithPartition, partitions=[{pt=1}], project=[a0, a2, a1]]], fields=[a0, a2, a1])
+- Exchange(distribution=[hash[b1, b2]])
+- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1])
]]>
Expand Down Expand Up @@ -1097,7 +1097,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[1],[2],[4
+- Calc(select=[a0, a1, a2, null:INTEGER AS EXPR$3, b0, b2, b1])
+- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a1, a2, b0, b2, b1])
:- Exchange(distribution=[hash[a1, a2]])
: +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a0, a1, a2], metadata=[]]], fields=[a0, a1, a2])
: +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a0, a1, a2]]], fields=[a0, a1, a2])
+- Exchange(distribution=[hash[b1, b2]])
+- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1])
]]>
Expand All @@ -1123,7 +1123,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[1],[2],[0],[4
+- Calc(select=[a0 AS a2, a1 AS a0, a2 AS a1, null:INTEGER AS EXPR$3, b0, b2, b1])
+- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a1, a2, a0, b0, b2, b1])
:- Exchange(distribution=[hash[a1, a2]])
: +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a1, a2, a0], metadata=[]]], fields=[a1, a2, a0])
: +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a1, a2, a0]]], fields=[a1, a2, a0])
+- Exchange(distribution=[hash[b1, b2]])
+- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1])
]]>
Expand All @@ -1150,7 +1150,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[1],[2],[4
+- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a1, a2, b0, b2, b1], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a1, a2]])
: +- Calc(select=[a0, a1, SUBSTRING(a2, 2) AS a2])
: +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a0, a1, a2], metadata=[]]], fields=[a0, a1, a2])
: +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a0, a1, a2]]], fields=[a0, a1, a2])
+- Exchange(distribution=[hash[b1, b2]])
+- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1])
]]>
Expand All @@ -1177,7 +1177,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[1],[2],[4
+- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a1, a2, b0, b2, b1])
:- Exchange(distribution=[hash[a1, a2]])
: +- Calc(select=[a0, a1, SUBSTRING(a2, 2) AS a2])
: +- TableSourceScan(table=[[default_catalog, default_database, src1WithMultiIndexes, project=[a0, a1, a2], metadata=[]]], fields=[a0, a1, a2])
: +- TableSourceScan(table=[[default_catalog, default_database, src1WithMultiIndexes, project=[a0, a1, a2]]], fields=[a0, a1, a2])
+- Exchange(distribution=[hash[b1, b2]])
+- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1])
]]>
Expand Down