Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, RexTableArgCall
import org.apache.flink.table.planner.plan.`trait`._
import org.apache.flink.table.planner.plan.`trait`.DeleteKindTrait.{deleteOnKeyOrNone, fullDeleteOrNone, DELETE_BY_KEY}
import org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{beforeAfterOrNone, onlyAfterOrNone, BEFORE_AND_AFTER, ONLY_UPDATE_AFTER}
import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.physical.stream._
import org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver
import org.apache.flink.table.planner.plan.schema.TableSourceTable
import org.apache.flink.table.planner.plan.utils._
import org.apache.flink.table.planner.plan.utils.RankProcessStrategy.{AppendFastStrategy, RetractStrategy, UpdateFastStrategy}
import org.apache.flink.table.planner.sinks.DataStreamTableSink
Expand All @@ -43,9 +45,11 @@ import org.apache.flink.types.RowKind
import org.apache.calcite.linq4j.Ord
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.core.JoinRelType
import org.apache.calcite.rex.RexCall
import org.apache.calcite.rex.{RexCall, RexNode}
import org.apache.calcite.util.ImmutableBitSet

import java.util.Collections

import scala.collection.JavaConversions._

/** An optimize program to infer ChangelogMode for every physical node. */
Expand Down Expand Up @@ -656,26 +660,31 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti

case join: StreamPhysicalJoin =>
val onlyAfterByParent = requiredUpdateTrait.updateKind == UpdateKind.ONLY_UPDATE_AFTER
val children = join.getInputs.zipWithIndex.map {
case (child, childOrdinal) =>
val physicalChild = child.asInstanceOf[StreamPhysicalRel]
val supportOnlyAfter = join.inputUniqueKeyContainsJoinKey(childOrdinal)
val inputModifyKindSet = getModifyKindSet(physicalChild)
if (onlyAfterByParent) {
if (inputModifyKindSet.contains(ModifyKind.UPDATE) && !supportOnlyAfter) {
// the parent requires only-after, however, the join doesn't support this
None
} else {
this.visit(physicalChild, onlyAfterOrNone(inputModifyKindSet))
}
} else {
this.visit(physicalChild, beforeAfterOrNone(inputModifyKindSet))
}
}
if (children.exists(_.isEmpty)) {
if (onlyAfterByParent && hasNonUpsertKeyNonEquiCondition(join)) {
// FLINK-38579: non-equi condition on non-upsert key requires UPDATE_BEFORE
None
} else {
createNewNode(join, Some(children.flatten.toList), requiredUpdateTrait)
val children = join.getInputs.zipWithIndex.map {
case (child, childOrdinal) =>
val physicalChild = child.asInstanceOf[StreamPhysicalRel]
val supportOnlyAfter = join.inputUniqueKeyContainsJoinKey(childOrdinal)
val inputModifyKindSet = getModifyKindSet(physicalChild)
if (onlyAfterByParent) {
if (inputModifyKindSet.contains(ModifyKind.UPDATE) && !supportOnlyAfter) {
// the parent requires only-after, however, the join doesn't support this
None
} else {
this.visit(physicalChild, onlyAfterOrNone(inputModifyKindSet))
}
} else {
this.visit(physicalChild, beforeAfterOrNone(inputModifyKindSet))
}
}
if (children.exists(_.isEmpty)) {
None
} else {
createNewNode(join, Some(children.flatten.toList), requiredUpdateTrait)
}
}

case temporalJoin: StreamPhysicalTemporalJoin =>
Expand Down Expand Up @@ -796,16 +805,24 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
case ts: StreamPhysicalTableSourceScan =>
// currently only support BEFORE_AND_AFTER if source produces updates
val providedTrait = UpdateKindTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
val newSource = createNewNode(rel, Some(List()), providedTrait)
if (
providedTrait.equals(UpdateKindTrait.BEFORE_AND_AFTER) &&
requiredUpdateTrait.equals(UpdateKindTrait.ONLY_UPDATE_AFTER)
requiredUpdateTrait == UpdateKindTrait.ONLY_UPDATE_AFTER &&
hasNonUpsertKeyFilterPushedDown(ts)
) {
// requiring only-after, but the source is CDC source, then drop update_before manually
val dropUB = new StreamPhysicalDropUpdateBefore(rel.getCluster, rel.getTraitSet, rel)
createNewNode(dropUB, newSource.map(s => List(s)), requiredUpdateTrait)
// FLINK-38579: filter on non-upsert key requires UPDATE_BEFORE
None
} else {
newSource
val newSource = createNewNode(rel, Some(List()), providedTrait)
if (
providedTrait.equals(UpdateKindTrait.BEFORE_AND_AFTER) &&
requiredUpdateTrait.equals(UpdateKindTrait.ONLY_UPDATE_AFTER)
) {
// requiring only-after, but the source is CDC source, then drop update_before manually
val dropUB = new StreamPhysicalDropUpdateBefore(rel.getCluster, rel.getTraitSet, rel)
createNewNode(dropUB, newSource.map(s => List(s)), requiredUpdateTrait)
} else {
newSource
}
}

case _: StreamPhysicalDataStreamScan | _: StreamPhysicalLegacyTableSourceScan |
Expand Down Expand Up @@ -1187,27 +1204,35 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
createNewNode(process, Some(children), providedDeleteTrait)

case join: StreamPhysicalJoin =>
val children = join.getInputs.zipWithIndex.map {
case (child, childOrdinal) =>
val physicalChild = child.asInstanceOf[StreamPhysicalRel]
val supportsDeleteByKey = join.inputUniqueKeyContainsJoinKey(childOrdinal)
val inputModifyKindSet = getModifyKindSet(physicalChild)
if (supportsDeleteByKey && requiredTrait == DELETE_BY_KEY) {
this
.visit(physicalChild, deleteOnKeyOrNone(inputModifyKindSet))
.orElse(this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet)))
} else {
this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet))
}
}
if (children.exists(_.isEmpty)) {
if (
requiredTrait == DeleteKindTrait.DELETE_BY_KEY &&
hasNonUpsertKeyNonEquiCondition(join)
) {
// FLINK-38579: non-equi condition on non-upsert key requires full DELETE
None
} else {
val childRels = children.flatten.toList
if (childRels.exists(r => getDeleteKind(r) == DeleteKind.DELETE_BY_KEY)) {
createNewNode(join, Some(childRels), deleteOnKeyOrNone(getModifyKindSet(rel)))
val children = join.getInputs.zipWithIndex.map {
case (child, childOrdinal) =>
val physicalChild = child.asInstanceOf[StreamPhysicalRel]
val supportsDeleteByKey = join.inputUniqueKeyContainsJoinKey(childOrdinal)
val inputModifyKindSet = getModifyKindSet(physicalChild)
if (supportsDeleteByKey && requiredTrait == DELETE_BY_KEY) {
this
.visit(physicalChild, deleteOnKeyOrNone(inputModifyKindSet))
.orElse(this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet)))
} else {
this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet))
}
}
if (children.exists(_.isEmpty)) {
None
} else {
createNewNode(join, Some(childRels), fullDeleteOrNone(getModifyKindSet(rel)))
val childRels = children.flatten.toList
if (childRels.exists(r => getDeleteKind(r) == DeleteKind.DELETE_BY_KEY)) {
createNewNode(join, Some(childRels), deleteOnKeyOrNone(getModifyKindSet(rel)))
} else {
createNewNode(join, Some(childRels), fullDeleteOrNone(getModifyKindSet(rel)))
}
}
}

Expand Down Expand Up @@ -1316,7 +1341,15 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
case ts: StreamPhysicalTableSourceScan =>
// currently only support BEFORE_AND_AFTER if source produces updates
val providedTrait = DeleteKindTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
createNewNode(rel, Some(List()), providedTrait)
if (
requiredTrait == DeleteKindTrait.DELETE_BY_KEY &&
hasNonUpsertKeyFilterPushedDown(ts)
) {
// FLINK-38579: filter on non-upsert key requires full DELETE
None
} else {
createNewNode(rel, Some(List()), providedTrait)
}

case _: StreamPhysicalDataStreamScan | _: StreamPhysicalLegacyTableSourceScan |
_: StreamPhysicalValues =>
Expand Down Expand Up @@ -1482,6 +1515,90 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
}
}

private def referencesNonUpsertKeyColumns(node: RelNode, rexNodes: Seq[RexNode]): Boolean = {
if (rexNodes.isEmpty) {
return false
}

val fmq = FlinkRelMetadataQuery.reuseOrCreate(node.getCluster.getMetadataQuery)
val upsertKeys = fmq.getUpsertKeys(node)

if (upsertKeys == null || upsertKeys.isEmpty) {
return true
}

val fieldRefIndices = ImmutableBitSet.of(
RexNodeExtractor.extractRefInputFields(JavaScalaConversionUtil.toJava(rexNodes)): _*)

!upsertKeys.exists(upsertKey => upsertKey.contains(fieldRefIndices))
}

private def hasNonUpsertKeyFilterPushedDown(ts: StreamPhysicalTableSourceScan): Boolean = {
val tableSourceTable = ts.getTable.unwrap(classOf[TableSourceTable])
if (tableSourceTable == null) {
return false
}

val filterSpec = tableSourceTable.abilitySpecs
.collectFirst { case spec: FilterPushDownSpec => spec }

filterSpec match {
case Some(spec) =>
val predicates = JavaScalaConversionUtil.toScala(spec.getPredicates)
referencesNonUpsertKeyColumns(ts, predicates)
case None => false
}
}

private def hasNonUpsertKeyNonEquiCondition(join: StreamPhysicalJoin): Boolean = {
val nonEquiCondOpt = join.joinSpec.getNonEquiCondition
if (!nonEquiCondOpt.isPresent) {
return false
}

// Only block UB drop if inputs actually produce UPDATE messages
// For batch sources with changelogMode=[I], there are no updates, so it's safe to allow UB drop
val leftModifyKindSet = getModifyKindSet(join.getLeft)
val rightModifyKindSet = getModifyKindSet(join.getRight)
if (
!leftModifyKindSet.contains(ModifyKind.UPDATE) &&
!rightModifyKindSet.contains(ModifyKind.UPDATE)
) {
return false // No updates produced, safe to allow UB drop
}

val condition = nonEquiCondOpt.get()
val referencedFields = ImmutableBitSet.of(
RexNodeExtractor.extractRefInputFields(Collections.singletonList(condition)): _*)

if (referencedFields.isEmpty) {
return false
}

val fmq = FlinkRelMetadataQuery.reuseOrCreate(join.getCluster.getMetadataQuery)
val leftFieldCount = join.getLeft.getRowType.getFieldCount

val leftBuilder = ImmutableBitSet.builder()
val rightBuilder = ImmutableBitSet.builder()
referencedFields.foreach {
idx =>
if (idx < leftFieldCount) leftBuilder.set(idx)
else rightBuilder.set(idx - leftFieldCount)
}

val leftRefs = leftBuilder.build()
val rightRefs = rightBuilder.build()

def referencesNonUpsert(input: RelNode, fields: ImmutableBitSet): Boolean = {
if (fields.isEmpty) return false
val upsertKeys = fmq.getUpsertKeys(input)
upsertKeys == null || upsertKeys.isEmpty || !upsertKeys.exists(_.contains(fields))
}

referencesNonUpsert(join.getLeft, leftRefs) ||
referencesNonUpsert(join.getRight, rightRefs)
}

private def getModifyKindSet(node: RelNode): ModifyKindSet = {
val modifyKindSetTrait = node.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
modifyKindSetTrait.modifyKindSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,13 +334,11 @@ LogicalSink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0
<Resource name="optimized rel plan">
<![CDATA[
Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1, a2, a3, b0, b2, b1], changelogMode=[NONE])
+- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2), >(a3, b0))], select=[a0, a1, a2, a3, b0, b2, b1], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA])
:- Exchange(distribution=[hash[a1, a2]], changelogMode=[I,UA])
: +- DropUpdateBefore(changelogMode=[I,UA])
: +- TableSourceScan(table=[[default_catalog, default_database, no_delete_src1]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA])
+- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UA])
+- DropUpdateBefore(changelogMode=[I,UA])
+- TableSourceScan(table=[[default_catalog, default_database, no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
+- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2), >(a3, b0))], select=[a0, a1, a2, a3, b0, b2, b1], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA])
:- Exchange(distribution=[hash[a1, a2]], changelogMode=[I,UB,UA])
: +- TableSourceScan(table=[[default_catalog, default_database, no_delete_src1]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA])
+- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UB,UA])
+- TableSourceScan(table=[[default_catalog, default_database, no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -387,13 +385,11 @@ LogicalSink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0
<Resource name="optimized rel plan">
<![CDATA[
Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1, a2, a3, b0, b2, b1], changelogMode=[NONE])
+- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a1, a2, a3, b0, b2, b1], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA])
:- Exchange(distribution=[hash[a1, a2]], changelogMode=[I,UA])
: +- DropUpdateBefore(changelogMode=[I,UA])
: +- TableSourceScan(table=[[default_catalog, default_database, no_delete_src1, filter=[>(a3, 1)]]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA])
+- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UA])
+- DropUpdateBefore(changelogMode=[I,UA])
+- TableSourceScan(table=[[default_catalog, default_database, no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
+- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a1, a2, a3, b0, b2, b1], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA])
:- Exchange(distribution=[hash[a1, a2]], changelogMode=[I,UB,UA])
: +- TableSourceScan(table=[[default_catalog, default_database, no_delete_src1, filter=[>(a3, 1)]]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA])
+- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UB,UA])
+- TableSourceScan(table=[[default_catalog, default_database, no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1390,15 +1390,13 @@ Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a, b, CAST(op_ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS op_ts])
: +- WatermarkAssigner(rowtime=[op_ts], watermark=[(op_ts - 5000:INTERVAL SECOND)])
: +- DropUpdateBefore
: +- Calc(select=[a, b, op_ts])
: +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_watermark, project=[a, b, c, op_ts], metadata=[]]], fields=[a, b, c, op_ts])(reuse_id=[1])
: +- Calc(select=[a, b, op_ts])
: +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_watermark, project=[a, b, c, op_ts], metadata=[]]], fields=[a, b, c, op_ts])(reuse_id=[1])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, c, CAST(op_ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS op_ts])
+- WatermarkAssigner(rowtime=[op_ts], watermark=[(op_ts - 5000:INTERVAL SECOND)])
+- DropUpdateBefore
+- Calc(select=[a, c, op_ts])
+- Reused(reference_id=[1])
+- Calc(select=[a, c, op_ts])
+- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -1429,15 +1427,13 @@ Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a, b, CAST(op_ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS op_ts])
: +- WatermarkAssigner(rowtime=[op_ts], watermark=[(op_ts - 5000:INTERVAL SECOND)])
: +- DropUpdateBefore
: +- Calc(select=[a, b, op_ts])
: +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_watermark, project=[a, b, c, op_ts], metadata=[]]], fields=[a, b, c, op_ts])(reuse_id=[1])
: +- Calc(select=[a, b, op_ts])
: +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_watermark, project=[a, b, c, op_ts], metadata=[]]], fields=[a, b, c, op_ts])(reuse_id=[1])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, c, CAST(op_ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS op_ts])
+- WatermarkAssigner(rowtime=[op_ts], watermark=[(op_ts - 5000:INTERVAL SECOND)])
+- DropUpdateBefore
+- Calc(select=[a, c, op_ts])
+- Reused(reference_id=[1])
+- Calc(select=[a, c, op_ts])
+- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>
Expand Down
Loading