From 9b3b81595257fc0ad00fe16310d69bd4f3a59143 Mon Sep 17 00:00:00 2001 From: Kyle Bendickson Date: Sun, 27 Mar 2022 17:20:27 -0700 Subject: [PATCH 1/5] Flink - Demonstrate bug in positional based deletes in same snapshot --- .../TestFlinkMultipleUpsertsOnNonPKField.java | 162 ++++++++++++++++++ 1 file changed, 162 insertions(+) create mode 100644 flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java new file mode 100644 index 000000000000..59b61fb174cd --- /dev/null +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.time.LocalDate; +import java.util.List; +import java.util.Map; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.types.Row; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestFlinkMultipleUpsertsOnNonPKField extends FlinkCatalogTestBase { + + @ClassRule + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = + MiniClusterResource.createWithClassloaderCheckDisabled(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private final boolean isStreamingJob; + private final Map tableUpsertProps = Maps.newHashMap(); + private TableEnvironment tEnv; + + public TestFlinkUpsert(String catalogName, Namespace baseNamespace, FileFormat format, Boolean isStreamingJob) { + super(catalogName, baseNamespace); + this.isStreamingJob = isStreamingJob; + tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2"); + tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true"); + tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name()); + } + + @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}") + public static Iterable parameters() { + List parameters = Lists.newArrayList(); + for (FileFormat format : new FileFormat[] {FileFormat.PARQUET, FileFormat.AVRO, FileFormat.ORC}) { + for (Boolean isStreaming : new Boolean[] {true, false}) { + // Only test with one catalog as this is a file operation concern. + // FlinkCatalogTestBase requires the catalog name start with testhadoop if using hadoop catalog. + String catalogName = "testhadoop"; + Namespace baseNamespace = Namespace.of("default"); + parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming}); + } + } + return parameters; + } + + @Override + protected TableEnvironment getTableEnv() { + if (tEnv == null) { + synchronized (this) { + EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings + .newInstance(); + if (isStreamingJob) { + settingsBuilder.inStreamingMode(); + StreamExecutionEnvironment env = StreamExecutionEnvironment + .getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG); + env.enableCheckpointing(400); + env.setMaxParallelism(2); + env.setParallelism(2); + tEnv = StreamTableEnvironment.create(env, settingsBuilder.build()); + } else { + settingsBuilder.inBatchMode(); + tEnv = TableEnvironment.create(settingsBuilder.build()); + } + } + } + return tEnv; + } + + @Override + @Before + public void before() { + super.before(); + sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase); + sql("USE CATALOG %s", catalogName); + sql("USE %s", DATABASE); + } + + @Override + @After + public void clean() { + sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + super.clean(); + } + + @Test + public void testMultipleUpsertsToOneRowWithNonPKFieldChanging() { + String tableName = "test_multiple_upserts_on_one_row"; + LocalDate dt = LocalDate.of(2022, 3, 1); + try { + sql("CREATE TABLE %s(data STRING NOT NULL, dt DATE NOT NULL, id INT NOT NULL, bool BOOLEAN NOT NULL, " + + "PRIMARY KEY(data,dt) NOT ENFORCED) " + + "PARTITIONED BY (data) WITH %s", + tableName, toWithClause(tableUpsertProps)); + + sql("INSERT INTO %s VALUES " + + "('aaa', TO_DATE('2022-03-01'), 1, false)," + + "('aaa', TO_DATE('2022-03-01'), 2, false)," + + "('bbb', TO_DATE('2022-03-01'), 3, false)", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of("aaa", dt, 2, false), Row.of("bbb", dt, 3, false))); + + // Process several duplicates of the same record with PK ('aaa', TO_DATE('2022-03-01')). + // Depending on the number of times that records are inserted for that row, one of the + // rows 2 back will be used instead. + // + // Indicating possibly an issue with insertedRowMap checking and/or the positional delete + // writer. + sql("INSERT INTO %s VALUES " + + "('aaa', TO_DATE('2022-03-01'), 6, false)," + + "('aaa', TO_DATE('2022-03-01'), 6, true)," + + "('aaa', TO_DATE('2022-03-01'), 6, false)," + + "('aaa', TO_DATE('2022-03-01'), 6, false)", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of("aaa", dt, 6, false), Row.of("bbb", dt, 3, false))); + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } + } +} + From 2d219ca8bd72a6dcf65bffab00acc0d9df8ad601 Mon Sep 17 00:00:00 2001 From: Kyle Bendickson Date: Mon, 28 Mar 2022 12:50:42 -0700 Subject: [PATCH 2/5] Fix constructor name to match class and fix checkstyle --- .../iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java index 59b61fb174cd..0e83511147dc 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java @@ -31,7 +31,6 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.junit.After; @@ -56,7 +55,7 @@ public class TestFlinkMultipleUpsertsOnNonPKField extends FlinkCatalogTestBase { private final Map tableUpsertProps = Maps.newHashMap(); private TableEnvironment tEnv; - public TestFlinkUpsert(String catalogName, Namespace baseNamespace, FileFormat format, Boolean isStreamingJob) { + public TestFlinkMultipleUpsertsOnNonPKField(String catalogName, Namespace baseNamespace, FileFormat format, Boolean isStreamingJob) { super(catalogName, baseNamespace); this.isStreamingJob = isStreamingJob; tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2"); From ff548c1922894eddc5907bbd39e64863c67fcee1 Mon Sep 17 00:00:00 2001 From: Kyle Bendickson Date: Mon, 28 Mar 2022 12:57:24 -0700 Subject: [PATCH 3/5] Add in equivalent test case with data that does pass --- .../TestFlinkMultipleUpsertsOnNonPKField.java | 50 ++++++++++++++++++- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java index 0e83511147dc..11760f0a85a4 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java @@ -118,8 +118,8 @@ public void clean() { } @Test - public void testMultipleUpsertsToOneRowWithNonPKFieldChanging() { - String tableName = "test_multiple_upserts_on_one_row"; + public void testMultipleUpsertsToOneRowWithNonPKFieldChanging_fails() { + String tableName = "test_multiple_upserts_on_one_row_fails"; LocalDate dt = LocalDate.of(2022, 3, 1); try { sql("CREATE TABLE %s(data STRING NOT NULL, dt DATE NOT NULL, id INT NOT NULL, bool BOOLEAN NOT NULL, " + @@ -157,5 +157,51 @@ public void testMultipleUpsertsToOneRowWithNonPKFieldChanging() { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); } } + + /** + * This test is the same as the one above, except the non-PK field `id` has increasing + * values instead of the same value throughout. + * + * This one passes while the other one does not. + */ + @Test + public void testMultipleUpsertsToOneRowWithNonPKFieldChanging_succeeds() { + String tableName = "test_multiple_upserts_on_one_row_succeeds"; + LocalDate dt = LocalDate.of(2022, 3, 1); + try { + sql("CREATE TABLE %s(data STRING NOT NULL, dt DATE NOT NULL, id INT NOT NULL, bool BOOLEAN NOT NULL, " + + "PRIMARY KEY(data,dt) NOT ENFORCED) " + + "PARTITIONED BY (data) WITH %s", + tableName, toWithClause(tableUpsertProps)); + + sql("INSERT INTO %s VALUES " + + "('aaa', TO_DATE('2022-03-01'), 1, false)," + + "('aaa', TO_DATE('2022-03-01'), 2, false)," + + "('bbb', TO_DATE('2022-03-01'), 3, false)", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of("aaa", dt, 2, false), Row.of("bbb", dt, 3, false))); + + // Process several duplicates of the same record with PK ('aaa', TO_DATE('2022-03-01')). + // Depending on the number of times that records are inserted for that row, one of the + // rows 2 back will be used instead. + // + // Indicating possibly an issue with insertedRowMap checking and/or the positional delete + // writer. + sql("INSERT INTO %s VALUES " + + "('aaa', TO_DATE('2022-03-01'), 1, false)," + + "('aaa', TO_DATE('2022-03-01'), 2, false)," + + "('aaa', TO_DATE('2022-03-01'), 3, false)," + + "('aaa', TO_DATE('2022-03-01'), 6, false)", + tableName); + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of("aaa", dt, 6, false), Row.of("bbb", dt, 3, false))); + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } + } } From ccb2eda0aa0621de31a07b3b18c27f072b9242fd Mon Sep 17 00:00:00 2001 From: Kyle Bendickson Date: Tue, 10 May 2022 09:07:56 -0700 Subject: [PATCH 4/5] Dont use TO_DATE function as it generates a plan node - use DATE --- .../TestFlinkMultipleUpsertsOnNonPKField.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java index 11760f0a85a4..9f03ac5685f7 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java @@ -128,9 +128,9 @@ public void testMultipleUpsertsToOneRowWithNonPKFieldChanging_fails() { tableName, toWithClause(tableUpsertProps)); sql("INSERT INTO %s VALUES " + - "('aaa', TO_DATE('2022-03-01'), 1, false)," + - "('aaa', TO_DATE('2022-03-01'), 2, false)," + - "('bbb', TO_DATE('2022-03-01'), 3, false)", + "('aaa', DATE '2022-03-01', 1, false)," + + "('aaa', DATE '2022-03-01', 2, false)," + + "('bbb', DATE '2022-03-01', 3, false)", tableName); TestHelpers.assertRows( @@ -175,9 +175,9 @@ public void testMultipleUpsertsToOneRowWithNonPKFieldChanging_succeeds() { tableName, toWithClause(tableUpsertProps)); sql("INSERT INTO %s VALUES " + - "('aaa', TO_DATE('2022-03-01'), 1, false)," + - "('aaa', TO_DATE('2022-03-01'), 2, false)," + - "('bbb', TO_DATE('2022-03-01'), 3, false)", + "('aaa', DATE '2022-03-01', 1, false)," + + "('aaa', DATE '2022-03-01', 2, false)," + + "('bbb', DATE '2022-03-01', 3, false)", tableName); TestHelpers.assertRows( @@ -191,10 +191,10 @@ public void testMultipleUpsertsToOneRowWithNonPKFieldChanging_succeeds() { // Indicating possibly an issue with insertedRowMap checking and/or the positional delete // writer. sql("INSERT INTO %s VALUES " + - "('aaa', TO_DATE('2022-03-01'), 1, false)," + - "('aaa', TO_DATE('2022-03-01'), 2, false)," + - "('aaa', TO_DATE('2022-03-01'), 3, false)," + - "('aaa', TO_DATE('2022-03-01'), 6, false)", + "('aaa', DATE '2022-03-01', 1, false)," + + "('aaa', DATE '2022-03-01', 2, false)," + + "('aaa', DATE '2022-03-01', 3, false)," + + "('aaa', DATE '2022-03-01', 6, false)", tableName); TestHelpers.assertRows( sql("SELECT * FROM %s", tableName), From 8b49bec0f3e2b8c54c6aee2534b2bf1ffa73e8e0 Mon Sep 17 00:00:00 2001 From: Kyle Bendickson Date: Tue, 10 May 2022 09:42:49 -0700 Subject: [PATCH 5/5] Checkstyle --- .../iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java index 9f03ac5685f7..d89a94781d0b 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkMultipleUpsertsOnNonPKField.java @@ -55,7 +55,10 @@ public class TestFlinkMultipleUpsertsOnNonPKField extends FlinkCatalogTestBase { private final Map tableUpsertProps = Maps.newHashMap(); private TableEnvironment tEnv; - public TestFlinkMultipleUpsertsOnNonPKField(String catalogName, Namespace baseNamespace, FileFormat format, Boolean isStreamingJob) { + public TestFlinkMultipleUpsertsOnNonPKField(String catalogName, + Namespace baseNamespace, + FileFormat format, + Boolean isStreamingJob) { super(catalogName, baseNamespace); this.isStreamingJob = isStreamingJob; tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");