diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java index f0afac17a9e7..a29fc981fdb7 100644 --- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java +++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java @@ -22,6 +22,11 @@ import java.time.LocalDate; import java.util.List; import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; @@ -31,9 +36,11 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Pair; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -162,40 +169,12 @@ public void testUpsertAndQuery() { @Test public void testPrimaryKeyEqualToPartitionKey() { - // This is an SQL based reproduction of TestFlinkIcebergSinkV2#testUpsertOnDataKey String tableName = "upsert_on_data_key"; try { sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL, PRIMARY KEY(data) NOT ENFORCED) " + "PARTITIONED BY (data) WITH %s", tableName, toWithClause(tableUpsertProps)); - - sql("INSERT INTO %s VALUES " + - "(1, 'aaa')," + - "(2, 'aaa')," + - "(3, 'bbb')", - tableName); - - TestHelpers.assertRows( - sql("SELECT * FROM %s", tableName), - Lists.newArrayList(Row.of(2, "aaa"), Row.of(3, "bbb"))); - - sql("INSERT INTO %s VALUES " + - "(4, 'aaa')," + - "(5, 'bbb')", - tableName); - - TestHelpers.assertRows( - sql("SELECT * FROM %s", tableName), - Lists.newArrayList(Row.of(4, "aaa"), Row.of(5, "bbb"))); - - sql("INSERT INTO %s VALUES " + - "(6, 'aaa')," + - "(7, 'bbb')", - tableName); - - TestHelpers.assertRows( - sql("SELECT * FROM %s", tableName), - Lists.newArrayList(Row.of(6, "aaa"), Row.of(7, "bbb"))); + testUpsert(tableName, UnaryOperator.identity()); } finally { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); } @@ -210,33 +189,7 @@ public void testPrimaryKeyFieldsAtBeginningOfSchema() { "PARTITIONED BY (data) WITH %s", tableName, toWithClause(tableUpsertProps)); - sql("INSERT INTO %s VALUES " + - "('aaa', DATE '2022-03-01', 1)," + - "('aaa', DATE '2022-03-01', 2)," + - "('bbb', DATE '2022-03-01', 3)", - tableName); - - TestHelpers.assertRows( - sql("SELECT * FROM %s", tableName), - Lists.newArrayList(Row.of("aaa", dt, 2), Row.of("bbb", dt, 3))); - - sql("INSERT INTO %s VALUES " + - "('aaa', DATE '2022-03-01', 4)," + - "('bbb', DATE '2022-03-01', 5)", - tableName); - - TestHelpers.assertRows( - sql("SELECT * FROM %s", tableName), - Lists.newArrayList(Row.of("aaa", dt, 4), Row.of("bbb", dt, 5))); - - sql("INSERT INTO %s VALUES " + - "('aaa', DATE '2022-03-01', 6)," + - "('bbb', DATE '2022-03-01', 7)", - tableName); - - TestHelpers.assertRows( - sql("SELECT * FROM %s", tableName), - Lists.newArrayList(Row.of("aaa", dt, 6), Row.of("bbb", dt, 7))); + testUpsert(tableName, row -> Row.of(row.getField(1), dt, row.getField(0))); } finally { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); } @@ -252,36 +205,47 @@ public void testPrimaryKeyFieldsAtEndOfTableSchema() { sql("CREATE TABLE %s(id INT, data STRING NOT NULL, dt DATE NOT NULL, PRIMARY KEY(data,dt) NOT ENFORCED) " + "PARTITIONED BY (data) WITH %s", tableName, toWithClause(tableUpsertProps)); - - sql("INSERT INTO %s VALUES " + - "(1, 'aaa', DATE '2022-03-01')," + - "(2, 'aaa', DATE '2022-03-01')," + - "(3, 'bbb', DATE '2022-03-01')", - tableName); - - TestHelpers.assertRows( - sql("SELECT * FROM %s", tableName), - Lists.newArrayList(Row.of(2, "aaa", dt), Row.of(3, "bbb", dt))); - - sql("INSERT INTO %s VALUES " + - "(4, 'aaa', DATE '2022-03-01')," + - "(5, 'bbb', DATE '2022-03-01')", - tableName); - - TestHelpers.assertRows( - sql("SELECT * FROM %s", tableName), - Lists.newArrayList(Row.of(4, "aaa", dt), Row.of(5, "bbb", dt))); - - sql("INSERT INTO %s VALUES " + - "(6, 'aaa', DATE '2022-03-01')," + - "(7, 'bbb', DATE '2022-03-01')", - tableName); - - TestHelpers.assertRows( - sql("SELECT * FROM %s", tableName), - Lists.newArrayList(Row.of(6, "aaa", dt), Row.of(7, "bbb", dt))); + testUpsert(tableName, row -> Row.of(row.getField(0), row.getField(1), dt)); } finally { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); } } + + private void testUpsert(String tableName, UnaryOperator rowMapper) { + List keyOptions = Lists.newArrayList("aaa", "bbb", "ccc"); + AtomicInteger id = new AtomicInteger(0); + Random random = new Random(); + Map expected = Maps.newHashMap(); + for (int i = 0; i < 5; i++) { + List updatedRowSeeds = keyOptions.stream() + .flatMap(key -> IntStream.range(0, random.nextInt(4)) // multiple or none update in one insert + .mapToObj(x -> Row.of(id.getAndIncrement(), key))) + .collect(Collectors.toList()); + if (updatedRowSeeds.isEmpty()) { + updatedRowSeeds = Lists.newArrayList(Row.of(id.getAndIncrement(), keyOptions.get(0))); + } + final List> updatedRows = updatedRowSeeds.stream() + .map(r -> Pair.of(r.getFieldAs(1), rowMapper.apply(r))).collect(Collectors.toList()); + String values = updatedRows.stream() + .map(Pair::second).map(TestFlinkUpsert::rowToSqlValues).collect(Collectors.joining(",")); + sql("INSERT INTO %s VALUES " + values, tableName); + updatedRows.forEach(r -> expected.put(r.first(), r.second())); + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), ImmutableList.copyOf(expected.values())); + } + } + + private static String rowToSqlValues(Row row) { + return IntStream.range(0, row.getArity()).mapToObj(row::getField).map(f -> { + if (f instanceof Integer) { + return f.toString(); + } else if (f instanceof String) { + return String.format("'%s'", f); + } else if (f instanceof LocalDate) { + return String.format("DATE '%s'", f); + } else { + throw new IllegalArgumentException("not reached"); + } + }).collect(Collectors.joining(",", "(", ")")); + } } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java index f0afac17a9e7..a29fc981fdb7 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java @@ -22,6 +22,11 @@ import java.time.LocalDate; import java.util.List; import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; @@ -31,9 +36,11 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Pair; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -162,40 +169,12 @@ public void testUpsertAndQuery() { @Test public void testPrimaryKeyEqualToPartitionKey() { - // This is an SQL based reproduction of TestFlinkIcebergSinkV2#testUpsertOnDataKey String tableName = "upsert_on_data_key"; try { sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL, PRIMARY KEY(data) NOT ENFORCED) " + "PARTITIONED BY (data) WITH %s", tableName, toWithClause(tableUpsertProps)); - - sql("INSERT INTO %s VALUES " + - "(1, 'aaa')," + - "(2, 'aaa')," + - "(3, 'bbb')", - tableName); - - TestHelpers.assertRows( - sql("SELECT * FROM %s", tableName), - Lists.newArrayList(Row.of(2, "aaa"), Row.of(3, "bbb"))); - - sql("INSERT INTO %s VALUES " + - "(4, 'aaa')," + - "(5, 'bbb')", - tableName); - - TestHelpers.assertRows( - sql("SELECT * FROM %s", tableName), - Lists.newArrayList(Row.of(4, "aaa"), Row.of(5, "bbb"))); - - sql("INSERT INTO %s VALUES " + - "(6, 'aaa')," + - "(7, 'bbb')", - tableName); - - TestHelpers.assertRows( - sql("SELECT * FROM %s", tableName), - Lists.newArrayList(Row.of(6, "aaa"), Row.of(7, "bbb"))); + testUpsert(tableName, UnaryOperator.identity()); } finally { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); } @@ -210,33 +189,7 @@ public void testPrimaryKeyFieldsAtBeginningOfSchema() { "PARTITIONED BY (data) WITH %s", tableName, toWithClause(tableUpsertProps)); - sql("INSERT INTO %s VALUES " + - "('aaa', DATE '2022-03-01', 1)," + - "('aaa', DATE '2022-03-01', 2)," + - "('bbb', DATE '2022-03-01', 3)", - tableName); - - TestHelpers.assertRows( - sql("SELECT * FROM %s", tableName), - Lists.newArrayList(Row.of("aaa", dt, 2), Row.of("bbb", dt, 3))); - - sql("INSERT INTO %s VALUES " + - "('aaa', DATE '2022-03-01', 4)," + - "('bbb', DATE '2022-03-01', 5)", - tableName); - - TestHelpers.assertRows( - sql("SELECT * FROM %s", tableName), - Lists.newArrayList(Row.of("aaa", dt, 4), Row.of("bbb", dt, 5))); - - sql("INSERT INTO %s VALUES " + - "('aaa', DATE '2022-03-01', 6)," + - "('bbb', DATE '2022-03-01', 7)", - tableName); - - TestHelpers.assertRows( - sql("SELECT * FROM %s", tableName), - Lists.newArrayList(Row.of("aaa", dt, 6), Row.of("bbb", dt, 7))); + testUpsert(tableName, row -> Row.of(row.getField(1), dt, row.getField(0))); } finally { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); } @@ -252,36 +205,47 @@ public void testPrimaryKeyFieldsAtEndOfTableSchema() { sql("CREATE TABLE %s(id INT, data STRING NOT NULL, dt DATE NOT NULL, PRIMARY KEY(data,dt) NOT ENFORCED) " + "PARTITIONED BY (data) WITH %s", tableName, toWithClause(tableUpsertProps)); - - sql("INSERT INTO %s VALUES " + - "(1, 'aaa', DATE '2022-03-01')," + - "(2, 'aaa', DATE '2022-03-01')," + - "(3, 'bbb', DATE '2022-03-01')", - tableName); - - TestHelpers.assertRows( - sql("SELECT * FROM %s", tableName), - Lists.newArrayList(Row.of(2, "aaa", dt), Row.of(3, "bbb", dt))); - - sql("INSERT INTO %s VALUES " + - "(4, 'aaa', DATE '2022-03-01')," + - "(5, 'bbb', DATE '2022-03-01')", - tableName); - - TestHelpers.assertRows( - sql("SELECT * FROM %s", tableName), - Lists.newArrayList(Row.of(4, "aaa", dt), Row.of(5, "bbb", dt))); - - sql("INSERT INTO %s VALUES " + - "(6, 'aaa', DATE '2022-03-01')," + - "(7, 'bbb', DATE '2022-03-01')", - tableName); - - TestHelpers.assertRows( - sql("SELECT * FROM %s", tableName), - Lists.newArrayList(Row.of(6, "aaa", dt), Row.of(7, "bbb", dt))); + testUpsert(tableName, row -> Row.of(row.getField(0), row.getField(1), dt)); } finally { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); } } + + private void testUpsert(String tableName, UnaryOperator rowMapper) { + List keyOptions = Lists.newArrayList("aaa", "bbb", "ccc"); + AtomicInteger id = new AtomicInteger(0); + Random random = new Random(); + Map expected = Maps.newHashMap(); + for (int i = 0; i < 5; i++) { + List updatedRowSeeds = keyOptions.stream() + .flatMap(key -> IntStream.range(0, random.nextInt(4)) // multiple or none update in one insert + .mapToObj(x -> Row.of(id.getAndIncrement(), key))) + .collect(Collectors.toList()); + if (updatedRowSeeds.isEmpty()) { + updatedRowSeeds = Lists.newArrayList(Row.of(id.getAndIncrement(), keyOptions.get(0))); + } + final List> updatedRows = updatedRowSeeds.stream() + .map(r -> Pair.of(r.getFieldAs(1), rowMapper.apply(r))).collect(Collectors.toList()); + String values = updatedRows.stream() + .map(Pair::second).map(TestFlinkUpsert::rowToSqlValues).collect(Collectors.joining(",")); + sql("INSERT INTO %s VALUES " + values, tableName); + updatedRows.forEach(r -> expected.put(r.first(), r.second())); + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), ImmutableList.copyOf(expected.values())); + } + } + + private static String rowToSqlValues(Row row) { + return IntStream.range(0, row.getArity()).mapToObj(row::getField).map(f -> { + if (f instanceof Integer) { + return f.toString(); + } else if (f instanceof String) { + return String.format("'%s'", f); + } else if (f instanceof LocalDate) { + return String.format("DATE '%s'", f); + } else { + throw new IllegalArgumentException("not reached"); + } + }).collect(Collectors.joining(",", "(", ")")); + } }