diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala
index ea52022297c82..e9be436b5ed46 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala
@@ -44,6 +44,7 @@ import scala.collection.JavaConversions._
* for the standard logical algebra.
*/
class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] {
+ private val MaxGeneratedEnrichedKeys = 128
override def getDef: MetadataDef[UpsertKeys] = UpsertKeys.DEF
@@ -348,10 +349,12 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] {
val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
val leftKeys = fmq.getUpsertKeys(left)
val rightKeys = fmq.getUpsertKeys(right)
+ val leftFieldCount = left.getRowType.getFieldCount
- FlinkRelMdUniqueKeys.INSTANCE.getJoinUniqueKeys(
+ // First get the base join unique keys
+ val baseKeys = FlinkRelMdUniqueKeys.INSTANCE.getJoinUniqueKeys(
joinRelType,
- left.getRowType.getFieldCount,
+ leftFieldCount,
// Retain only keys whose columns are contained in the join's equi-join columns
// (the distribution keys), ensuring the result remains an upsert key.
// Note: An Exchange typically applies this filtering already via fmq.getUpsertKeys(...).
@@ -361,6 +364,97 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] {
isSideUnique(leftKeys, joinInfo.leftSet),
isSideUnique(rightKeys, joinInfo.rightSet)
)
+
+ // Enrich the keys by substituting equivalent columns from equi-join conditions
+ // The base keys are in joined output space, so enrichment works directly
+ enrichJoinedKeys(baseKeys, joinInfo, joinRelType, leftFieldCount)
+ }
+
+ /**
+ * Enriches join result keys by substituting columns with their equivalents from equi-join
+ * conditions.
+ *
+ * @param keys
+ * The upsert keys in joined output coordinate space
+ * @param joinInfo
+ * The join information containing equi-join column pairs
+ * @param joinRelType
+ * The join type (to check nullability constraints)
+ * @param leftFieldCount
+ * The number of fields from the left side
+ * @return
+ * The enriched set of upsert keys
+ */
+ private def enrichJoinedKeys(
+ keys: JSet[ImmutableBitSet],
+ joinInfo: JoinInfo,
+ joinRelType: JoinRelType,
+ leftFieldCount: Int): JSet[ImmutableBitSet] = {
+ val pairs = joinInfo.leftKeys.zip(joinInfo.rightKeys).map {
+ case (l, r) => (l.intValue(), r.intValue() + leftFieldCount)
+ }
+ enrichKeysWithEquivalences(keys, pairs, joinRelType)
+ }
+
+ /**
+ * Core enrichment logic: for each key and each column equivalence pair, generates enriched
+ * versions by substituting one column with its equivalent.
+ *
+ * For example, if a key is {a2, b2} and there's an equivalence a1 = a2, this generates the
+ * additional key {a1, b2}.
+ *
+ * The enrichment respects join type nullability by controlling substitution directions:
+ * - Right→Left (replace right col with left): only if left side is never NULL
+ * - Left→Right (replace left col with right): only if right side is never NULL
+ *
+ * This prevents invalid keys where the substituted column might be NULL, causing the remaining
+ * columns (which may not be unique by themselves) to incorrectly appear as a valid key.
+ *
+ * @param keys
+ * The upsert keys to enrich
+ * @param equivalentPairs
+ * Column equivalence pairs (leftCol, rightCol) in joined output coordinate space
+ * @param joinRelType
+ * The join type (determines allowed substitution directions)
+ * @return
+ * The enriched set of upsert keys (includes original keys)
+ */
+ private def enrichKeysWithEquivalences(
+ keys: JSet[ImmutableBitSet],
+ equivalentPairs: java.lang.Iterable[(Int, Int)],
+ joinRelType: JoinRelType): JSet[ImmutableBitSet] = {
+
+ if (keys == null) return null
+
+ val allowRightToLeft = !joinRelType.generatesNullsOnLeft()
+ val allowLeftToRight = !joinRelType.generatesNullsOnRight()
+
+ val seen = new util.HashSet[ImmutableBitSet](keys.size() * 2)
+ val queue = new util.ArrayDeque[ImmutableBitSet]()
+
+ @inline def enqueue(k: ImmutableBitSet): Unit =
+ if (seen.size() < MaxGeneratedEnrichedKeys && seen.add(k)) queue.add(k)
+
+ @inline def expand(key: ImmutableBitSet): Unit = {
+ val it = equivalentPairs.iterator()
+ while (it.hasNext) {
+ val (l, r) = it.next()
+ if (allowRightToLeft && key.get(r)) enqueue(key.clear(r).set(l))
+ if (allowLeftToRight && key.get(l)) enqueue(key.clear(l).set(r))
+ }
+ }
+
+ // seed
+ val seedIt = keys.iterator()
+ while (seedIt.hasNext) enqueue(seedIt.next())
+
+ // fixpoint
+ while (!queue.isEmpty) {
+ expand(queue.poll())
+ if (seen.size() >= MaxGeneratedEnrichedKeys) return seen
+ }
+
+ seen
}
def getUpsertKeys(rel: SetOp, mq: RelMetadataQuery): JSet[ImmutableBitSet] =
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
index f6d9db54b946a..879d41e2b0aeb 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
@@ -774,6 +774,190 @@ LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala
index 40e38dda16120..29806a06b607a 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala
@@ -320,7 +320,7 @@ class FlinkRelMdUpsertKeysTest extends FlinkRelMdHandlerTestBase {
@Test
def testGetUpsertKeysOnJoin(): Unit = {
assertEquals(
- toBitSet(Array(1), Array(5), Array(1, 5), Array(5, 6), Array(1, 5, 6)),
+ toBitSet(Array(1), Array(5), Array(1, 5), Array(1, 6), Array(5, 6), Array(1, 5, 6)),
mq.getUpsertKeys(logicalInnerJoinOnUniqueKeys).toSet)
assertEquals(toBitSet(), mq.getUpsertKeys(logicalInnerJoinNotOnUniqueKeys).toSet)
assertEquals(toBitSet(), mq.getUpsertKeys(logicalInnerJoinOnRHSUniqueKeys).toSet)
@@ -328,7 +328,7 @@ class FlinkRelMdUpsertKeysTest extends FlinkRelMdHandlerTestBase {
assertEquals(toBitSet(), mq.getUpsertKeys(logicalInnerJoinWithEquiAndNonEquiCond).toSet)
assertEquals(
- toBitSet(Array(1), Array(1, 5), Array(1, 5, 6)),
+ toBitSet(Array(1), Array(1, 5), Array(1, 6), Array(1, 5, 6)),
mq.getUpsertKeys(logicalLeftJoinOnUniqueKeys).toSet)
assertEquals(toBitSet(), mq.getUpsertKeys(logicalLeftJoinNotOnUniqueKeys).toSet)
assertEquals(toBitSet(), mq.getUpsertKeys(logicalLeftJoinOnRHSUniqueKeys).toSet)
@@ -336,7 +336,7 @@ class FlinkRelMdUpsertKeysTest extends FlinkRelMdHandlerTestBase {
assertEquals(toBitSet(), mq.getUpsertKeys(logicalLeftJoinWithEquiAndNonEquiCond).toSet)
assertEquals(
- toBitSet(Array(5), Array(1, 5), Array(5, 6), Array(1, 5, 6)),
+ toBitSet(Array(5), Array(5, 6), Array(1, 5), Array(1, 5, 6)),
mq.getUpsertKeys(logicalRightJoinOnUniqueKeys).toSet)
assertEquals(toBitSet(), mq.getUpsertKeys(logicalRightJoinNotOnUniqueKeys).toSet)
assertEquals(toBitSet(), mq.getUpsertKeys(logicalRightJoinOnLHSUniqueKeys).toSet)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
index ea527aebe0285..9e1b58be7434c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
@@ -686,4 +686,302 @@ class JoinTest extends TableTestBase {
"Key: 'table.exec.mini-batch.size' , default: -1 (fallback keys: []) must be > 0.")
.isInstanceOf[IllegalArgumentException]
}
+
+ // Tests for FLINK-38753: Enrich upsert keys by equiv expressions in joins
+ @Test
+ def testJoinUpsertKeyEnrichmentInnerJoinBasic(): Unit = {
+ // Basic case from FLINK-38753: src1 pk {a1}, src2 pk {a2, b2}, join a1 = a2
+ // sink pk {a1, b2} should NOT have upsertMaterialize because {a1, b2} can be derived
+ // from {a2, b2} by substituting a2 with a1
+ util.tableEnv.executeSql("""
+ |create table src1 (
+ | a1 varchar,
+ | x1 varchar,
+ | primary key (a1) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ util.tableEnv.executeSql("""
+ |create table src2 (
+ | a2 varchar,
+ | b2 varchar,
+ | x2 varchar,
+ | primary key (a2, b2) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ util.tableEnv.executeSql("""
+ |create table sink (
+ | a1 varchar,
+ | b2 varchar,
+ | x1 varchar,
+ | primary key (a1, b2) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+
+ util.verifyExplainInsert(
+ """
+ |insert into sink
+ |select src1.a1, src2.b2, src1.x1
+ |from src1 join src2 on src1.a1 = src2.a2
+ |""".stripMargin,
+ ExplainDetail.CHANGELOG_MODE
+ )
+ }
+
+ @Test
+ def testJoinUpsertKeyEnrichmentInnerJoinReverse(): Unit = {
+ // Reverse direction: src1 pk {a1, b1}, src2 pk {a2}, join a1 = a2
+ // sink pk {a2, b1} should NOT have upsertMaterialize because {a2, b1} can be derived
+ // from {a1, b1} by substituting a1 with a2
+ util.tableEnv.executeSql("""
+ |create table src1 (
+ | a1 varchar,
+ | b1 varchar,
+ | x1 varchar,
+ | primary key (a1, b1) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ util.tableEnv.executeSql("""
+ |create table src2 (
+ | a2 varchar,
+ | x2 varchar,
+ | primary key (a2) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ util.tableEnv.executeSql("""
+ |create table sink (
+ | a2 varchar,
+ | b1 varchar,
+ | x1 varchar,
+ | primary key (a2, b1) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+
+ util.verifyExplainInsert(
+ """
+ |insert into sink
+ |select src2.a2, src1.b1, src1.x1
+ |from src1 join src2 on src1.a1 = src2.a2
+ |""".stripMargin,
+ ExplainDetail.CHANGELOG_MODE
+ )
+ }
+
+ @Test
+ def testJoinUpsertKeyEnrichmentLeftJoin(): Unit = {
+ // Left join with right to left enrichment: src1 pk {a1}, src2 pk {a2, b2}, join a1 = a2
+ // sink pk {a1, b2} should NOT have upsertMaterialize
+ util.tableEnv.executeSql("""
+ |create table src1 (
+ | a1 varchar,
+ | x1 varchar,
+ | primary key (a1) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ util.tableEnv.executeSql("""
+ |create table src2 (
+ | a2 varchar,
+ | b2 varchar,
+ | x2 varchar,
+ | primary key (a2, b2) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ util.tableEnv.executeSql("""
+ |create table sink (
+ | a1 varchar,
+ | b2 varchar,
+ | x1 varchar,
+ | primary key (a1, b2) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+
+ util.verifyExplainInsert(
+ """
+ |insert into sink
+ |select src1.a1, src2.b2, src1.x1
+ |from src1 left join src2 on src1.a1 = src2.a2
+ |""".stripMargin,
+ ExplainDetail.CHANGELOG_MODE
+ )
+ }
+
+ @Test
+ def testJoinUpsertKeyEnrichmentRightJoin(): Unit = {
+ // Right join with left to right enrichment: src1 pk {a1, b1}, src2 pk {a2}, join a1 = a2
+ // sink pk {a2, b1} should NOT have upsertMaterialize
+ util.tableEnv.executeSql("""
+ |create table src1 (
+ | a1 varchar,
+ | b1 varchar,
+ | x1 varchar,
+ | primary key (a1, b1) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ util.tableEnv.executeSql("""
+ |create table src2 (
+ | a2 varchar,
+ | x2 varchar,
+ | primary key (a2) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ util.tableEnv.executeSql("""
+ |create table sink (
+ | a2 varchar,
+ | b1 varchar,
+ | x1 varchar,
+ | primary key (a2, b1) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+
+ util.verifyExplainInsert(
+ """
+ |insert into sink
+ |select src2.a2, src1.b1, src1.x1
+ |from src1 right join src2 on src1.a1 = src2.a2
+ |""".stripMargin,
+ ExplainDetail.CHANGELOG_MODE
+ )
+ }
+
+ @Test
+ def testJoinUpsertKeyEnrichmentMultipleEquiConditions(): Unit = {
+ // Multiple equi-conditions: src1 pk {a1, b1}, src2 pk {a2, b2}, join a1 = a2 AND b1 = b2
+ // sink pk {a1, b2} should NOT have upsertMaterialize
+ util.tableEnv.executeSql("""
+ |create table src1 (
+ | a1 varchar,
+ | b1 varchar,
+ | x1 varchar,
+ | primary key (a1, b1) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ util.tableEnv.executeSql("""
+ |create table src2 (
+ | a2 varchar,
+ | b2 varchar,
+ | x2 varchar,
+ | primary key (a2, b2) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ util.tableEnv.executeSql("""
+ |create table sink (
+ | a1 varchar,
+ | b2 varchar,
+ | x1 varchar,
+ | primary key (a1, b2) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+
+ util.verifyExplainInsert(
+ """
+ |insert into sink
+ |select src1.a1, src2.b2, src1.x1
+ |from src1 join src2 on src1.a1 = src2.a2 AND src1.b1 = src2.b2
+ |""".stripMargin,
+ ExplainDetail.CHANGELOG_MODE
+ )
+ }
+
+ @Test
+ def testJoinUpsertKeyEnrichmentNegativeCase(): Unit = {
+ // Negative case: src1 pk {a1}, src2 pk {a2, b2}, join a1 = a2, sink pk {a1, c2}
+ // SHOULD have upsertMaterialize because c2 is not in the equi-join condition
+ util.tableEnv.executeSql("""
+ |create table src1 (
+ | a1 varchar,
+ | x1 varchar,
+ | primary key (a1) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ util.tableEnv.executeSql("""
+ |create table src2 (
+ | a2 varchar,
+ | b2 varchar,
+ | c2 varchar,
+ | primary key (a2, b2) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ util.tableEnv.executeSql("""
+ |create table sink (
+ | a1 varchar,
+ | c2 varchar,
+ | x1 varchar,
+ | primary key (a1, c2) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+
+ util.verifyExplainInsert(
+ """
+ |insert into sink
+ |select src1.a1, src2.c2, src1.x1
+ |from src1 join src2 on src1.a1 = src2.a2
+ |""".stripMargin,
+ ExplainDetail.CHANGELOG_MODE
+ )
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtilTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtilTest.scala
index ee91f0d79cfd6..94df20a319dab 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtilTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtilTest.scala
@@ -127,7 +127,7 @@ class FlinkRelOptUtilTest {
val optimized = planner.optimize(rel)
val expected1 =
"""
- |Join(joinType=[InnerJoin], where=[=(a, a0)], select=[a, c, cnt, a0, b], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], upsertKeys=[[a, c, a0], [a, c]])
+ |Join(joinType=[InnerJoin], where=[=(a, a0)], select=[a, c, cnt, a0, b], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], upsertKeys=[[a, c, a0], [a, c], [c, a0]])
|:- Exchange(distribution=[hash[a]], upsertKeys=[[a, c]])
|: +- GroupAggregate(groupBy=[a, c], select=[a, c, COUNT(*) AS cnt], upsertKeys=[[a, c]])
|: +- Exchange(distribution=[hash[a, c]])