From 169f0326cbc622bb60237b7fd658a6735b2be858 Mon Sep 17 00:00:00 2001 From: Yi Tang Date: Mon, 11 Apr 2022 16:05:33 +0800 Subject: [PATCH] Flink: fix out of order appends in upsert UT In Flink, only VALUES with literal tuples can be converted to a single LogicalValues node. VALUES with other type of tuples will be converted to a chain with multiple LogicalProject nodes, like following: LogicalValues -> (# number or rows)LogicalProject -> LogicalUnion. The `TO_DATE('xxxx-xx-xx')` expression is not a literal node, so the VALUES used in upsert test cases will be converted in the latter way. The order of tuples can not be guaranteed, which can lead to unstable test results. Changing `TO_DATE('xxxx-xx-xx')` to `DATE 'xxxx-xx-xx'` can make it a literal node and ensure that the VALUES expression can be converted to a single LogicalValues node. --- .../apache/iceberg/flink/TestFlinkUpsert.java | 40 +++++++++---------- .../apache/iceberg/flink/TestFlinkUpsert.java | 40 +++++++++---------- 2 files changed, 40 insertions(+), 40 deletions(-) diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java index 6ec35e2ff169..f0afac17a9e7 100644 --- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java +++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java @@ -130,15 +130,15 @@ public void testUpsertAndQuery() { try { sql("INSERT INTO %s VALUES " + - "(1, 'a', TO_DATE('2022-03-01'))," + - "(2, 'b', TO_DATE('2022-03-01'))," + - "(1, 'b', TO_DATE('2022-03-01'))", + "(1, 'a', DATE '2022-03-01')," + + "(2, 'b', DATE '2022-03-01')," + + "(1, 'b', DATE '2022-03-01')", tableName); sql("INSERT INTO %s VALUES " + - "(4, 'a', TO_DATE('2022-03-02'))," + - "(5, 'b', TO_DATE('2022-03-02'))," + - "(1, 'b', TO_DATE('2022-03-02'))", + "(4, 'a', DATE '2022-03-02')," + + "(5, 'b', DATE '2022-03-02')," + + "(1, 'b', DATE '2022-03-02')", tableName); List rowsOn20220301 = Lists.newArrayList(Row.of(2, "b", dt20220301), Row.of(1, "a", dt20220301)); @@ -211,9 +211,9 @@ public void testPrimaryKeyFieldsAtBeginningOfSchema() { tableName, toWithClause(tableUpsertProps)); sql("INSERT INTO %s VALUES " + - "('aaa', TO_DATE('2022-03-01'), 1)," + - "('aaa', TO_DATE('2022-03-01'), 2)," + - "('bbb', TO_DATE('2022-03-01'), 3)", + "('aaa', DATE '2022-03-01', 1)," + + "('aaa', DATE '2022-03-01', 2)," + + "('bbb', DATE '2022-03-01', 3)", tableName); TestHelpers.assertRows( @@ -221,8 +221,8 @@ public void testPrimaryKeyFieldsAtBeginningOfSchema() { Lists.newArrayList(Row.of("aaa", dt, 2), Row.of("bbb", dt, 3))); sql("INSERT INTO %s VALUES " + - "('aaa', TO_DATE('2022-03-01'), 4)," + - "('bbb', TO_DATE('2022-03-01'), 5)", + "('aaa', DATE '2022-03-01', 4)," + + "('bbb', DATE '2022-03-01', 5)", tableName); TestHelpers.assertRows( @@ -230,8 +230,8 @@ public void testPrimaryKeyFieldsAtBeginningOfSchema() { Lists.newArrayList(Row.of("aaa", dt, 4), Row.of("bbb", dt, 5))); sql("INSERT INTO %s VALUES " + - "('aaa', TO_DATE('2022-03-01'), 6)," + - "('bbb', TO_DATE('2022-03-01'), 7)", + "('aaa', DATE '2022-03-01', 6)," + + "('bbb', DATE '2022-03-01', 7)", tableName); TestHelpers.assertRows( @@ -254,9 +254,9 @@ public void testPrimaryKeyFieldsAtEndOfTableSchema() { tableName, toWithClause(tableUpsertProps)); sql("INSERT INTO %s VALUES " + - "(1, 'aaa', TO_DATE('2022-03-01'))," + - "(2, 'aaa', TO_DATE('2022-03-01'))," + - "(3, 'bbb', TO_DATE('2022-03-01'))", + "(1, 'aaa', DATE '2022-03-01')," + + "(2, 'aaa', DATE '2022-03-01')," + + "(3, 'bbb', DATE '2022-03-01')", tableName); TestHelpers.assertRows( @@ -264,8 +264,8 @@ public void testPrimaryKeyFieldsAtEndOfTableSchema() { Lists.newArrayList(Row.of(2, "aaa", dt), Row.of(3, "bbb", dt))); sql("INSERT INTO %s VALUES " + - "(4, 'aaa', TO_DATE('2022-03-01'))," + - "(5, 'bbb', TO_DATE('2022-03-01'))", + "(4, 'aaa', DATE '2022-03-01')," + + "(5, 'bbb', DATE '2022-03-01')", tableName); TestHelpers.assertRows( @@ -273,8 +273,8 @@ public void testPrimaryKeyFieldsAtEndOfTableSchema() { Lists.newArrayList(Row.of(4, "aaa", dt), Row.of(5, "bbb", dt))); sql("INSERT INTO %s VALUES " + - "(6, 'aaa', TO_DATE('2022-03-01'))," + - "(7, 'bbb', TO_DATE('2022-03-01'))", + "(6, 'aaa', DATE '2022-03-01')," + + "(7, 'bbb', DATE '2022-03-01')", tableName); TestHelpers.assertRows( diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java index 6ec35e2ff169..f0afac17a9e7 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java @@ -130,15 +130,15 @@ public void testUpsertAndQuery() { try { sql("INSERT INTO %s VALUES " + - "(1, 'a', TO_DATE('2022-03-01'))," + - "(2, 'b', TO_DATE('2022-03-01'))," + - "(1, 'b', TO_DATE('2022-03-01'))", + "(1, 'a', DATE '2022-03-01')," + + "(2, 'b', DATE '2022-03-01')," + + "(1, 'b', DATE '2022-03-01')", tableName); sql("INSERT INTO %s VALUES " + - "(4, 'a', TO_DATE('2022-03-02'))," + - "(5, 'b', TO_DATE('2022-03-02'))," + - "(1, 'b', TO_DATE('2022-03-02'))", + "(4, 'a', DATE '2022-03-02')," + + "(5, 'b', DATE '2022-03-02')," + + "(1, 'b', DATE '2022-03-02')", tableName); List rowsOn20220301 = Lists.newArrayList(Row.of(2, "b", dt20220301), Row.of(1, "a", dt20220301)); @@ -211,9 +211,9 @@ public void testPrimaryKeyFieldsAtBeginningOfSchema() { tableName, toWithClause(tableUpsertProps)); sql("INSERT INTO %s VALUES " + - "('aaa', TO_DATE('2022-03-01'), 1)," + - "('aaa', TO_DATE('2022-03-01'), 2)," + - "('bbb', TO_DATE('2022-03-01'), 3)", + "('aaa', DATE '2022-03-01', 1)," + + "('aaa', DATE '2022-03-01', 2)," + + "('bbb', DATE '2022-03-01', 3)", tableName); TestHelpers.assertRows( @@ -221,8 +221,8 @@ public void testPrimaryKeyFieldsAtBeginningOfSchema() { Lists.newArrayList(Row.of("aaa", dt, 2), Row.of("bbb", dt, 3))); sql("INSERT INTO %s VALUES " + - "('aaa', TO_DATE('2022-03-01'), 4)," + - "('bbb', TO_DATE('2022-03-01'), 5)", + "('aaa', DATE '2022-03-01', 4)," + + "('bbb', DATE '2022-03-01', 5)", tableName); TestHelpers.assertRows( @@ -230,8 +230,8 @@ public void testPrimaryKeyFieldsAtBeginningOfSchema() { Lists.newArrayList(Row.of("aaa", dt, 4), Row.of("bbb", dt, 5))); sql("INSERT INTO %s VALUES " + - "('aaa', TO_DATE('2022-03-01'), 6)," + - "('bbb', TO_DATE('2022-03-01'), 7)", + "('aaa', DATE '2022-03-01', 6)," + + "('bbb', DATE '2022-03-01', 7)", tableName); TestHelpers.assertRows( @@ -254,9 +254,9 @@ public void testPrimaryKeyFieldsAtEndOfTableSchema() { tableName, toWithClause(tableUpsertProps)); sql("INSERT INTO %s VALUES " + - "(1, 'aaa', TO_DATE('2022-03-01'))," + - "(2, 'aaa', TO_DATE('2022-03-01'))," + - "(3, 'bbb', TO_DATE('2022-03-01'))", + "(1, 'aaa', DATE '2022-03-01')," + + "(2, 'aaa', DATE '2022-03-01')," + + "(3, 'bbb', DATE '2022-03-01')", tableName); TestHelpers.assertRows( @@ -264,8 +264,8 @@ public void testPrimaryKeyFieldsAtEndOfTableSchema() { Lists.newArrayList(Row.of(2, "aaa", dt), Row.of(3, "bbb", dt))); sql("INSERT INTO %s VALUES " + - "(4, 'aaa', TO_DATE('2022-03-01'))," + - "(5, 'bbb', TO_DATE('2022-03-01'))", + "(4, 'aaa', DATE '2022-03-01')," + + "(5, 'bbb', DATE '2022-03-01')", tableName); TestHelpers.assertRows( @@ -273,8 +273,8 @@ public void testPrimaryKeyFieldsAtEndOfTableSchema() { Lists.newArrayList(Row.of(4, "aaa", dt), Row.of(5, "bbb", dt))); sql("INSERT INTO %s VALUES " + - "(6, 'aaa', TO_DATE('2022-03-01'))," + - "(7, 'bbb', TO_DATE('2022-03-01'))", + "(6, 'aaa', DATE '2022-03-01')," + + "(7, 'bbb', DATE '2022-03-01')", tableName); TestHelpers.assertRows(