diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index 237ba7edea1ac..9ed186b5a58cb 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -28,9 +28,11 @@ import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, RexTableArgCall
import org.apache.flink.table.planner.plan.`trait`._
import org.apache.flink.table.planner.plan.`trait`.DeleteKindTrait.{deleteOnKeyOrNone, fullDeleteOrNone, DELETE_BY_KEY}
import org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{beforeAfterOrNone, onlyAfterOrNone, BEFORE_AND_AFTER, ONLY_UPDATE_AFTER}
+import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.physical.stream._
import org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver
+import org.apache.flink.table.planner.plan.schema.TableSourceTable
import org.apache.flink.table.planner.plan.utils._
import org.apache.flink.table.planner.plan.utils.RankProcessStrategy.{AppendFastStrategy, RetractStrategy, UpdateFastStrategy}
import org.apache.flink.table.planner.sinks.DataStreamTableSink
@@ -43,9 +45,11 @@ import org.apache.flink.types.RowKind
import org.apache.calcite.linq4j.Ord
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rex.RexCall
+import org.apache.calcite.rex.{RexCall, RexNode}
import org.apache.calcite.util.ImmutableBitSet
+import java.util.Collections
+
import scala.collection.JavaConversions._
/** An optimize program to infer ChangelogMode for every physical node. */
@@ -656,26 +660,31 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
case join: StreamPhysicalJoin =>
val onlyAfterByParent = requiredUpdateTrait.updateKind == UpdateKind.ONLY_UPDATE_AFTER
- val children = join.getInputs.zipWithIndex.map {
- case (child, childOrdinal) =>
- val physicalChild = child.asInstanceOf[StreamPhysicalRel]
- val supportOnlyAfter = join.inputUniqueKeyContainsJoinKey(childOrdinal)
- val inputModifyKindSet = getModifyKindSet(physicalChild)
- if (onlyAfterByParent) {
- if (inputModifyKindSet.contains(ModifyKind.UPDATE) && !supportOnlyAfter) {
- // the parent requires only-after, however, the join doesn't support this
- None
- } else {
- this.visit(physicalChild, onlyAfterOrNone(inputModifyKindSet))
- }
- } else {
- this.visit(physicalChild, beforeAfterOrNone(inputModifyKindSet))
- }
- }
- if (children.exists(_.isEmpty)) {
+ if (onlyAfterByParent && hasNonUpsertKeyNonEquiCondition(join)) {
+ // FLINK-38579: non-equi condition on non-upsert key requires UPDATE_BEFORE
None
} else {
- createNewNode(join, Some(children.flatten.toList), requiredUpdateTrait)
+ val children = join.getInputs.zipWithIndex.map {
+ case (child, childOrdinal) =>
+ val physicalChild = child.asInstanceOf[StreamPhysicalRel]
+ val supportOnlyAfter = join.inputUniqueKeyContainsJoinKey(childOrdinal)
+ val inputModifyKindSet = getModifyKindSet(physicalChild)
+ if (onlyAfterByParent) {
+ if (inputModifyKindSet.contains(ModifyKind.UPDATE) && !supportOnlyAfter) {
+ // the parent requires only-after, however, the join doesn't support this
+ None
+ } else {
+ this.visit(physicalChild, onlyAfterOrNone(inputModifyKindSet))
+ }
+ } else {
+ this.visit(physicalChild, beforeAfterOrNone(inputModifyKindSet))
+ }
+ }
+ if (children.exists(_.isEmpty)) {
+ None
+ } else {
+ createNewNode(join, Some(children.flatten.toList), requiredUpdateTrait)
+ }
}
case temporalJoin: StreamPhysicalTemporalJoin =>
@@ -796,16 +805,24 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
case ts: StreamPhysicalTableSourceScan =>
// currently only support BEFORE_AND_AFTER if source produces updates
val providedTrait = UpdateKindTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
- val newSource = createNewNode(rel, Some(List()), providedTrait)
if (
- providedTrait.equals(UpdateKindTrait.BEFORE_AND_AFTER) &&
- requiredUpdateTrait.equals(UpdateKindTrait.ONLY_UPDATE_AFTER)
+ requiredUpdateTrait == UpdateKindTrait.ONLY_UPDATE_AFTER &&
+ hasNonUpsertKeyFilterPushedDown(ts)
) {
- // requiring only-after, but the source is CDC source, then drop update_before manually
- val dropUB = new StreamPhysicalDropUpdateBefore(rel.getCluster, rel.getTraitSet, rel)
- createNewNode(dropUB, newSource.map(s => List(s)), requiredUpdateTrait)
+ // FLINK-38579: filter on non-upsert key requires UPDATE_BEFORE
+ None
} else {
- newSource
+ val newSource = createNewNode(rel, Some(List()), providedTrait)
+ if (
+ providedTrait.equals(UpdateKindTrait.BEFORE_AND_AFTER) &&
+ requiredUpdateTrait.equals(UpdateKindTrait.ONLY_UPDATE_AFTER)
+ ) {
+ // requiring only-after, but the source is CDC source, then drop update_before manually
+ val dropUB = new StreamPhysicalDropUpdateBefore(rel.getCluster, rel.getTraitSet, rel)
+ createNewNode(dropUB, newSource.map(s => List(s)), requiredUpdateTrait)
+ } else {
+ newSource
+ }
}
case _: StreamPhysicalDataStreamScan | _: StreamPhysicalLegacyTableSourceScan |
@@ -1187,27 +1204,35 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
createNewNode(process, Some(children), providedDeleteTrait)
case join: StreamPhysicalJoin =>
- val children = join.getInputs.zipWithIndex.map {
- case (child, childOrdinal) =>
- val physicalChild = child.asInstanceOf[StreamPhysicalRel]
- val supportsDeleteByKey = join.inputUniqueKeyContainsJoinKey(childOrdinal)
- val inputModifyKindSet = getModifyKindSet(physicalChild)
- if (supportsDeleteByKey && requiredTrait == DELETE_BY_KEY) {
- this
- .visit(physicalChild, deleteOnKeyOrNone(inputModifyKindSet))
- .orElse(this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet)))
- } else {
- this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet))
- }
- }
- if (children.exists(_.isEmpty)) {
+ if (
+ requiredTrait == DeleteKindTrait.DELETE_BY_KEY &&
+ hasNonUpsertKeyNonEquiCondition(join)
+ ) {
+ // FLINK-38579: non-equi condition on non-upsert key requires full DELETE
None
} else {
- val childRels = children.flatten.toList
- if (childRels.exists(r => getDeleteKind(r) == DeleteKind.DELETE_BY_KEY)) {
- createNewNode(join, Some(childRels), deleteOnKeyOrNone(getModifyKindSet(rel)))
+ val children = join.getInputs.zipWithIndex.map {
+ case (child, childOrdinal) =>
+ val physicalChild = child.asInstanceOf[StreamPhysicalRel]
+ val supportsDeleteByKey = join.inputUniqueKeyContainsJoinKey(childOrdinal)
+ val inputModifyKindSet = getModifyKindSet(physicalChild)
+ if (supportsDeleteByKey && requiredTrait == DELETE_BY_KEY) {
+ this
+ .visit(physicalChild, deleteOnKeyOrNone(inputModifyKindSet))
+ .orElse(this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet)))
+ } else {
+ this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet))
+ }
+ }
+ if (children.exists(_.isEmpty)) {
+ None
} else {
- createNewNode(join, Some(childRels), fullDeleteOrNone(getModifyKindSet(rel)))
+ val childRels = children.flatten.toList
+ if (childRels.exists(r => getDeleteKind(r) == DeleteKind.DELETE_BY_KEY)) {
+ createNewNode(join, Some(childRels), deleteOnKeyOrNone(getModifyKindSet(rel)))
+ } else {
+ createNewNode(join, Some(childRels), fullDeleteOrNone(getModifyKindSet(rel)))
+ }
}
}
@@ -1316,7 +1341,15 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
case ts: StreamPhysicalTableSourceScan =>
// currently only support BEFORE_AND_AFTER if source produces updates
val providedTrait = DeleteKindTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
- createNewNode(rel, Some(List()), providedTrait)
+ if (
+ requiredTrait == DeleteKindTrait.DELETE_BY_KEY &&
+ hasNonUpsertKeyFilterPushedDown(ts)
+ ) {
+ // FLINK-38579: filter on non-upsert key requires full DELETE
+ None
+ } else {
+ createNewNode(rel, Some(List()), providedTrait)
+ }
case _: StreamPhysicalDataStreamScan | _: StreamPhysicalLegacyTableSourceScan |
_: StreamPhysicalValues =>
@@ -1482,6 +1515,90 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
}
}
+ private def referencesNonUpsertKeyColumns(node: RelNode, rexNodes: Seq[RexNode]): Boolean = {
+ if (rexNodes.isEmpty) {
+ return false
+ }
+
+ val fmq = FlinkRelMetadataQuery.reuseOrCreate(node.getCluster.getMetadataQuery)
+ val upsertKeys = fmq.getUpsertKeys(node)
+
+ if (upsertKeys == null || upsertKeys.isEmpty) {
+ return true
+ }
+
+ val fieldRefIndices = ImmutableBitSet.of(
+ RexNodeExtractor.extractRefInputFields(JavaScalaConversionUtil.toJava(rexNodes)): _*)
+
+ !upsertKeys.exists(upsertKey => upsertKey.contains(fieldRefIndices))
+ }
+
+ private def hasNonUpsertKeyFilterPushedDown(ts: StreamPhysicalTableSourceScan): Boolean = {
+ val tableSourceTable = ts.getTable.unwrap(classOf[TableSourceTable])
+ if (tableSourceTable == null) {
+ return false
+ }
+
+ val filterSpec = tableSourceTable.abilitySpecs
+ .collectFirst { case spec: FilterPushDownSpec => spec }
+
+ filterSpec match {
+ case Some(spec) =>
+ val predicates = JavaScalaConversionUtil.toScala(spec.getPredicates)
+ referencesNonUpsertKeyColumns(ts, predicates)
+ case None => false
+ }
+ }
+
+ private def hasNonUpsertKeyNonEquiCondition(join: StreamPhysicalJoin): Boolean = {
+ val nonEquiCondOpt = join.joinSpec.getNonEquiCondition
+ if (!nonEquiCondOpt.isPresent) {
+ return false
+ }
+
+ // Only block UB drop if inputs actually produce UPDATE messages
+ // For batch sources with changelogMode=[I], there are no updates, so it's safe to allow UB drop
+ val leftModifyKindSet = getModifyKindSet(join.getLeft)
+ val rightModifyKindSet = getModifyKindSet(join.getRight)
+ if (
+ !leftModifyKindSet.contains(ModifyKind.UPDATE) &&
+ !rightModifyKindSet.contains(ModifyKind.UPDATE)
+ ) {
+ return false // No updates produced, safe to allow UB drop
+ }
+
+ val condition = nonEquiCondOpt.get()
+ val referencedFields = ImmutableBitSet.of(
+ RexNodeExtractor.extractRefInputFields(Collections.singletonList(condition)): _*)
+
+ if (referencedFields.isEmpty) {
+ return false
+ }
+
+ val fmq = FlinkRelMetadataQuery.reuseOrCreate(join.getCluster.getMetadataQuery)
+ val leftFieldCount = join.getLeft.getRowType.getFieldCount
+
+ val leftBuilder = ImmutableBitSet.builder()
+ val rightBuilder = ImmutableBitSet.builder()
+ referencedFields.foreach {
+ idx =>
+ if (idx < leftFieldCount) leftBuilder.set(idx)
+ else rightBuilder.set(idx - leftFieldCount)
+ }
+
+ val leftRefs = leftBuilder.build()
+ val rightRefs = rightBuilder.build()
+
+ def referencesNonUpsert(input: RelNode, fields: ImmutableBitSet): Boolean = {
+ if (fields.isEmpty) return false
+ val upsertKeys = fmq.getUpsertKeys(input)
+ upsertKeys == null || upsertKeys.isEmpty || !upsertKeys.exists(_.contains(fields))
+ }
+
+ referencesNonUpsert(join.getLeft, leftRefs) ||
+ referencesNonUpsert(join.getRight, rightRefs)
+ }
+
private def getModifyKindSet(node: RelNode): ModifyKindSet = {
val modifyKindSetTrait = node.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
modifyKindSetTrait.modifyKindSet
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
index cda6b9dbcda09..a1f05b3d764ef 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
@@ -334,13 +334,11 @@ LogicalSink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0
(a3, b0))], select=[a0, a1, a2, a3, b0, b2, b1], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA])
- :- Exchange(distribution=[hash[a1, a2]], changelogMode=[I,UA])
- : +- DropUpdateBefore(changelogMode=[I,UA])
- : +- TableSourceScan(table=[[default_catalog, default_database, no_delete_src1]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA])
- +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UA])
- +- DropUpdateBefore(changelogMode=[I,UA])
- +- TableSourceScan(table=[[default_catalog, default_database, no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2), >(a3, b0))], select=[a0, a1, a2, a3, b0, b2, b1], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA])
+ :- Exchange(distribution=[hash[a1, a2]], changelogMode=[I,UB,UA])
+ : +- TableSourceScan(table=[[default_catalog, default_database, no_delete_src1]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA])
+ +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UB,UA])
+ +- TableSourceScan(table=[[default_catalog, default_database, no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
]]>
@@ -387,13 +385,11 @@ LogicalSink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0
(a3, 1)]]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA])
- +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UA])
- +- DropUpdateBefore(changelogMode=[I,UA])
- +- TableSourceScan(table=[[default_catalog, default_database, no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a1, a2, a3, b0, b2, b1], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA])
+ :- Exchange(distribution=[hash[a1, a2]], changelogMode=[I,UB,UA])
+ : +- TableSourceScan(table=[[default_catalog, default_database, no_delete_src1, filter=[>(a3, 1)]]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA])
+ +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UB,UA])
+ +- TableSourceScan(table=[[default_catalog, default_database, no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
]]>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
index b54ca5ed03598..103acca062fa6 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
@@ -1390,15 +1390,13 @@ Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a, b, CAST(op_ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS op_ts])
: +- WatermarkAssigner(rowtime=[op_ts], watermark=[(op_ts - 5000:INTERVAL SECOND)])
- : +- DropUpdateBefore
- : +- Calc(select=[a, b, op_ts])
- : +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_watermark, project=[a, b, c, op_ts], metadata=[]]], fields=[a, b, c, op_ts])(reuse_id=[1])
+ : +- Calc(select=[a, b, op_ts])
+ : +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_watermark, project=[a, b, c, op_ts], metadata=[]]], fields=[a, b, c, op_ts])(reuse_id=[1])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, c, CAST(op_ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS op_ts])
+- WatermarkAssigner(rowtime=[op_ts], watermark=[(op_ts - 5000:INTERVAL SECOND)])
- +- DropUpdateBefore
- +- Calc(select=[a, c, op_ts])
- +- Reused(reference_id=[1])
+ +- Calc(select=[a, c, op_ts])
+ +- Reused(reference_id=[1])
]]>
@@ -1429,15 +1427,13 @@ Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a, b, CAST(op_ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS op_ts])
: +- WatermarkAssigner(rowtime=[op_ts], watermark=[(op_ts - 5000:INTERVAL SECOND)])
- : +- DropUpdateBefore
- : +- Calc(select=[a, b, op_ts])
- : +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_watermark, project=[a, b, c, op_ts], metadata=[]]], fields=[a, b, c, op_ts])(reuse_id=[1])
+ : +- Calc(select=[a, b, op_ts])
+ : +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_watermark, project=[a, b, c, op_ts], metadata=[]]], fields=[a, b, c, op_ts])(reuse_id=[1])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, c, CAST(op_ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS op_ts])
+- WatermarkAssigner(rowtime=[op_ts], watermark=[(op_ts - 5000:INTERVAL SECOND)])
- +- DropUpdateBefore
- +- Calc(select=[a, c, op_ts])
- +- Reused(reference_id=[1])
+ +- Calc(select=[a, c, op_ts])
+ +- Reused(reference_id=[1])
]]>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
index f6d9db54b946a..fdc7359e21b1d 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
@@ -908,17 +908,17 @@ LogicalProject(a1=[$1], b1=[$3])
(a2, b2))], select=[a2, a1, b2, b1], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
- :- Exchange(distribution=[hash[a1]], changelogMode=[I,UA])
- : +- Calc(select=[a2, a1], changelogMode=[I,UA])
- : +- GroupAggregate(groupBy=[a1], select=[a1, SUM(a2) AS a2], changelogMode=[I,UA])
+Calc(select=[a1, b1], changelogMode=[I,UB,UA,D])
++- Join(joinType=[LeftOuterJoin], where=[AND(=(a1, b1), >(a2, b2))], select=[a2, a1, b2, b1], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
+ :- Exchange(distribution=[hash[a1]], changelogMode=[I,UB,UA])
+ : +- Calc(select=[a2, a1], changelogMode=[I,UB,UA])
+ : +- GroupAggregate(groupBy=[a1], select=[a1, SUM(a2) AS a2], changelogMode=[I,UB,UA])
: +- Exchange(distribution=[hash[a1]], changelogMode=[I])
: +- Calc(select=[a1, a2], changelogMode=[I])
: +- TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a1, a2, a3], changelogMode=[I])
- +- Exchange(distribution=[hash[b1]], changelogMode=[I,UA])
- +- Calc(select=[b2, b1], changelogMode=[I,UA])
- +- GroupAggregate(groupBy=[b1], select=[b1, SUM(b2) AS b2], changelogMode=[I,UA])
+ +- Exchange(distribution=[hash[b1]], changelogMode=[I,UB,UA])
+ +- Calc(select=[b2, b1], changelogMode=[I,UB,UA])
+ +- GroupAggregate(groupBy=[b1], select=[b1, SUM(b2) AS b2], changelogMode=[I,UB,UA])
+- Exchange(distribution=[hash[b1]], changelogMode=[I])
+- Calc(select=[b1, b2], changelogMode=[I])
+- TableSourceScan(table=[[default_catalog, default_database, B]], fields=[b1, b2, b3], changelogMode=[I])
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala
index 4ac4e6ba4248c..9922802b44076 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala
@@ -32,7 +32,7 @@ import org.apache.flink.table.utils.LegacyRowExtension
import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters}
import org.apache.flink.types.{Row, RowKind}
-import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
import org.junit.jupiter.api.{BeforeEach, TestTemplate}
import org.junit.jupiter.api.extension.{ExtendWith, RegisterExtension}
@@ -434,6 +434,189 @@ class ChangelogSourceITCase(
}
}
+ @TestTemplate
+ def testFilterPushedDownOnNonUpsertKey(): Unit = {
+ // FLINK-38579: Filter pushed down to source on non-upsert key should require UPDATE_BEFORE
+
+ val testDataId = TestValuesTableFactory.registerData(
+ Seq(
+ changelogRow("+I", Int.box(1), "tom", Int.box(1)),
+ changelogRow("-U", Int.box(1), "tom", Int.box(1)),
+ changelogRow("+U", Int.box(1), "tom", Int.box(2))
+ ))
+ tEnv.executeSql(s"""
+ |CREATE TABLE t (
+ | a int primary key not enforced,
+ | b varchar,
+ | c int
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$testDataId',
+ | 'changelog-mode' = 'I,UA,UB,D',
+ | 'filterable-fields' = 'c'
+ |)
+ |""".stripMargin)
+
+ tEnv.executeSql(s"""
+ |CREATE TABLE s (
+ | a int primary key not enforced,
+ | b varchar,
+ | c int
+ |) WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false',
+ | 'sink-changelog-mode-enforced' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ // CDC duplicate + MiniBatch is incompatible: ChangelogNormalize (needed for CDC deduplication)
+ // requires ONLY_UPDATE_AFTER at the source level, but filter on non-upsert key requires UPDATE_BEFORE
+ if (sourceMode == CHANGELOG_SOURCE_WITH_EVENTS_DUPLICATE && miniBatch == MiniBatchOn) {
+ assertThatThrownBy(() => tEnv.executeSql("insert into s select * from t where c < 2"))
+ .isInstanceOf(classOf[org.apache.flink.table.api.TableException])
+ .hasMessageContaining("Can't generate a valid execution plan")
+ return
+ }
+
+ tEnv.executeSql("insert into s select * from t where c < 2").await()
+
+ // The result should be empty because:
+ // - Filter c < 2 matches only c=1
+ // - The record (1,tom,1) was inserted (+I) then deleted (-U)
+ // - With the fix, UPDATE_BEFORE is preserved and correctly deletes the record
+ // - Without the fix, UPDATE_BEFORE would be dropped, leaving (1,tom,1) in the result
+ val expected = List[String]()
+ assertThat(TestValuesTableFactory.getResultsAsStrings("s").sorted).isEqualTo(expected.sorted)
+ }
+
+ @TestTemplate
+ def testJoinWithNonEquivConditionOnNonUpsertKey(): Unit = {
+ // FLINK-38579: Join with non-equiv condition on non-upsert key should require UPDATE_BEFORE
+
+ val t1DataId = TestValuesTableFactory.registerData(
+ Seq(
+ changelogRow("+I", Int.box(1), Int.box(10)),
+ changelogRow("-U", Int.box(1), Int.box(10)),
+ changelogRow("+U", Int.box(1), Int.box(20))
+ ))
+ val t2DataId = TestValuesTableFactory.registerData(
+ Seq(
+ changelogRow("+I", Int.box(1), Int.box(100))
+ ))
+
+ tEnv.executeSql(s"""
+ |CREATE TABLE t1 (
+ | pk int primary key not enforced,
+ | val int
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$t1DataId',
+ | 'changelog-mode' = 'I,UA,UB,D'
+ |)
+ |""".stripMargin)
+
+ tEnv.executeSql(s"""
+ |CREATE TABLE t2 (
+ | pk int primary key not enforced,
+ | val int
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$t2DataId',
+ | 'changelog-mode' = 'I'
+ |)
+ |""".stripMargin)
+
+ tEnv.executeSql(s"""
+ |CREATE TABLE s (
+ | pk1 int,
+ | val1 int,
+ | pk2 int,
+ | val2 int
+ |) WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false',
+ | 'sink-changelog-mode-enforced' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ tEnv
+ .executeSql("insert into s select * from t1 join t2 on t1.pk = t2.pk and t1.val < 15")
+ .await()
+
+ // The result should be empty because:
+ // - Non-equiv condition t1.val < 15 matched (1, 10)
+ // - But (1, 10) was deleted by -U
+ // - With the fix, UPDATE_BEFORE is preserved and correctly deletes the joined record
+ // - Without the fix, UPDATE_BEFORE would be dropped, leaving (1,10,1,100) in result
+ val expected = List[String]()
+ assertThat(TestValuesTableFactory.getResultsAsStrings("s").sorted).isEqualTo(expected.sorted)
+ }
+
+ @TestTemplate
+ def testJoinWithNonEquivConditionOnRightNonUpsertKey(): Unit = {
+ // FLINK-38579: Test that non-equi condition on RIGHT side non-upsert key is correctly detected
+ // This validates the left/right split logic in hasNonUpsertKeyNonEquiCondition
+
+ val t1DataId = TestValuesTableFactory.registerData(
+ Seq(
+ changelogRow("+I", Int.box(1), Int.box(10))
+ ))
+ val t2DataId = TestValuesTableFactory.registerData(
+ Seq(
+ changelogRow("+I", Int.box(1), Int.box(100)),
+ changelogRow("-U", Int.box(1), Int.box(100)),
+ changelogRow("+U", Int.box(1), Int.box(200))
+ ))
+
+ tEnv.executeSql(s"""
+ |CREATE TABLE t1 (
+ | pk int primary key not enforced,
+ | val int
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$t1DataId',
+ | 'changelog-mode' = 'I'
+ |)
+ |""".stripMargin)
+
+ tEnv.executeSql(s"""
+ |CREATE TABLE t2 (
+ | pk int primary key not enforced,
+ | val int
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$t2DataId',
+ | 'changelog-mode' = 'I,UA,UB'
+ |)
+ |""".stripMargin)
+
+ tEnv.executeSql(s"""
+ |CREATE TABLE s (
+ | pk1 int,
+ | val1 int,
+ | pk2 int,
+ | val2 int
+ |) WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false',
+ | 'sink-changelog-mode-enforced' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ tEnv
+ .executeSql("insert into s select * from t1 join t2 on t1.pk = t2.pk and t2.val < 150")
+ .await()
+
+ // The result should be empty because:
+ // - Non-equiv condition t2.val < 150 matched (1, 100)
+ // - But t2 row was updated from val=100 to val=200
+ // - The (1, 100) was deleted by -U, then (1, 200) was inserted by +U
+ // - With the fix, UPDATE_BEFORE is preserved and correctly removes the joined record
+ // - Without the fix, UPDATE_BEFORE would be dropped, leaving (1,10,1,100) in result
+ val expected = List[String]()
+ assertThat(TestValuesTableFactory.getResultsAsStrings("s").sorted).isEqualTo(expected.sorted)
+ }
+
}
object ChangelogSourceITCase {