Skip to content

Commit 626e56f

Browse files
committed
add comment
1 parent 92a8364 commit 626e56f

File tree

1 file changed

+35
-1
lines changed

1 file changed

+35
-1
lines changed

flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ class DeltaJoinITCase(enableCache: Boolean) extends StreamingTestBase {
137137
}
138138

139139
@TestTemplate
140-
def testWithSameJoinKeyColValues(): Unit = {
140+
def testSameJoinKeyColValuesWhileJoinKeyEqualsIndex(): Unit = {
141141
val data1 = List(
142142
changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 1, 1, 1, 1, 1)),
143143
changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2022, 2, 2, 2, 2, 2)),
@@ -167,6 +167,40 @@ class DeltaJoinITCase(enableCache: Boolean) extends StreamingTestBase {
167167
if (enableCache) 4 else 6)
168168
}
169169

170+
@TestTemplate
171+
def testSameJoinKeyColValuesWhileJoinKeyContainsIndex(): Unit = {
172+
val data1 = List(
173+
changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 1, 1, 1, 1, 1)),
174+
changelogRow("+I", Double.box(1.0), Int.box(2), LocalDateTime.of(2021, 1, 1, 1, 1, 1)),
175+
// mismatch
176+
changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2023, 3, 3, 3, 3, 3))
177+
)
178+
179+
val data2 = List(
180+
changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021, 1, 1, 1, 1, 1)),
181+
changelogRow("+I", Int.box(2), Double.box(1.0), LocalDateTime.of(2021, 1, 1, 1, 1, 1)),
182+
// mismatch
183+
changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099, 2, 2, 2, 2, 2))
184+
)
185+
186+
// TestValuesRuntimeFunctions#KeyedUpsertingSinkFunction will change the RowKind from
187+
// "+U" to "+I"
188+
val expected = List(
189+
"+I[1.0, 1, 2021-01-01T01:01:01, 1, 1.0, 2021-01-01T01:01:01]",
190+
"+I[1.0, 1, 2021-01-01T01:01:01, 2, 1.0, 2021-01-01T01:01:01]",
191+
"+I[1.0, 2, 2021-01-01T01:01:01, 1, 1.0, 2021-01-01T01:01:01]",
192+
"+I[1.0, 2, 2021-01-01T01:01:01, 2, 1.0, 2021-01-01T01:01:01]"
193+
)
194+
testUpsertResult(
195+
List("a1"),
196+
List("b1"),
197+
data1,
198+
data2,
199+
"a1 = b1 and a2 = b2",
200+
expected,
201+
if (enableCache) 4 else 6)
202+
}
203+
170204
@TestTemplate
171205
def testWithNonEquiCondition1(): Unit = {
172206
val data1 = List(

0 commit comments

Comments
 (0)