diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 72cbf78824fa..c62a5d4a84a3 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -158,4 +158,7 @@ private TableProperties() { public static final String UPDATE_MODE = "write.update.mode"; public static final String UPDATE_MODE_DEFAULT = "copy-on-write"; + + public static final String EQUALITY_FIELD_COLUMNS = "equality.field.columns"; + public static final String DEFAULT_EQUALITY_FIELD_COLUMNS = null; } diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 2fd569628864..c79d5181c88a 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -51,6 +51,8 @@ import org.apache.flink.table.catalog.stats.CatalogTableStatistics; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.factories.TableFactory; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.util.StringUtils; import org.apache.iceberg.CachingCatalog; import org.apache.iceberg.DataFile; @@ -60,6 +62,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdateProperties; import org.apache.iceberg.catalog.Catalog; @@ -70,11 +73,14 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.PropertyUtil; /** * A Flink Catalog implementation that wraps an Iceberg {@link Catalog}. @@ -359,6 +365,13 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema); ImmutableMap.Builder properties = ImmutableMap.builder(); + + // Set the equality field columns. + List equalityFieldColumns = toEqualityColumns(table.getSchema()); + if (!equalityFieldColumns.isEmpty()) { + properties.put(TableProperties.EQUALITY_FIELD_COLUMNS, Joiner.on(',').join(equalityFieldColumns)); + } + String location = null; for (Map.Entry entry : table.getOptions().entrySet()) { if ("location".equalsIgnoreCase(entry.getKey())) { @@ -447,10 +460,12 @@ private static void validateFlinkTable(CatalogBaseTable table) { if (!schema.getWatermarkSpecs().isEmpty()) { throw new UnsupportedOperationException("Creating table with watermark specs is not supported yet."); } + } - if (schema.getPrimaryKey().isPresent()) { - throw new UnsupportedOperationException("Creating table with primary key is not supported yet."); - } + private static List toEqualityColumns(TableSchema schema) { + List equalityColumns = Lists.newArrayList(); + schema.getPrimaryKey().ifPresent(uniqueConstraint -> equalityColumns.addAll(uniqueConstraint.getColumns())); + return equalityColumns; } private static PartitionSpec toPartitionSpec(List partitionKeys, Schema icebergSchema) { @@ -516,7 +531,26 @@ private static void commitChanges(Table table, String setLocation, String setSna } static CatalogTable toCatalogTable(Table table) { - TableSchema schema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema())); + TableSchema.Builder builder = TableSchema.builder(); + + // Add the table columns. + RowType rowType = FlinkSchemaUtil.convert(table.schema()); + for (RowType.RowField field : rowType.getFields()) { + builder.field(field.getName(), TypeConversions.fromLogicalToDataType(field.getType())); + } + + // Add the primary keys. + String concatColumns = PropertyUtil.propertyAsString(table.properties(), + TableProperties.EQUALITY_FIELD_COLUMNS, + TableProperties.DEFAULT_EQUALITY_FIELD_COLUMNS); + String[] columns = Splitter.on(',').splitToList(concatColumns).toArray(new String[0]); + if (columns.length > 0) { + builder.primaryKey(columns); + } + + // Build the table schema. + TableSchema schema = builder.build(); + List partitionKeys = toPartitionKeys(table.spec(), table.schema()); // NOTE: We can not create a IcebergCatalogTable extends CatalogTable, because Flink optimizer may use diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 8c4486aa28c6..284dddfff007 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -21,9 +21,11 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; @@ -41,6 +43,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.io.WriteResult; @@ -200,6 +203,15 @@ public DataStreamSink build() { // Find out the equality field id list based on the user-provided equality field column names. List equalityFieldIds = Lists.newArrayList(); + if (equalityFieldColumns == null) { + String concatColumns = PropertyUtil.propertyAsString(table.properties(), + TableProperties.EQUALITY_FIELD_COLUMNS, + TableProperties.DEFAULT_EQUALITY_FIELD_COLUMNS); + String[] columns = StringUtils.split(concatColumns, ","); + if (columns != null && columns.length > 0) { + equalityFieldColumns = Arrays.asList(columns); + } + } if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) { for (String column : equalityFieldColumns) { org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column); diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java index fd6181b0df98..5f71e8ec3afc 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java +++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java @@ -21,6 +21,11 @@ import java.io.IOException; import java.util.Map; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.util.ArrayUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.CatalogProperties; @@ -135,4 +140,18 @@ static String toWithClause(Map props) { builder.append(")"); return builder.toString(); } + + static TableEnvironment createTableEnv(boolean isStreamingJob) { + EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings + .newInstance() + .useBlinkPlanner(); + if (isStreamingJob) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.enableCheckpointing(400); + return StreamTableEnvironment.create(env, settingsBuilder.inStreamingMode().build()); + } else { + return TableEnvironment.create(settingsBuilder.inBatchMode().build()); + } + } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index ff80d9da7e02..e9b41f9a1ea5 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -29,6 +29,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -98,6 +99,21 @@ public static Record createRecord(Integer id, String data) { return record; } + private static final Map ROW_KIND_MAP = ImmutableMap.of( + "+I", RowKind.INSERT, + "-D", RowKind.DELETE, + "-U", RowKind.UPDATE_BEFORE, + "+U", RowKind.UPDATE_AFTER); + + public static Row createRow(String rowKind, int id, String data) { + RowKind kind = ROW_KIND_MAP.get(rowKind); + if (kind == null) { + throw new IllegalArgumentException("Unknown row kind: " + rowKind); + } + + return Row.ofKind(kind, id, data); + } + public static RowData createRowData(Integer id, String data) { return GenericRowData.of(id, StringData.fromString(data)); } @@ -207,9 +223,18 @@ public static StructLikeSet expectedRowSet(Table table, Record... records) { } public static StructLikeSet actualRowSet(Table table, String... columns) throws IOException { + table.refresh(); + return actualRowSet(table, table.currentSnapshot().snapshotId(), columns); + } + + public static StructLikeSet actualRowSet(Table table, long snapshotId, String... columns) throws IOException { table.refresh(); StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); - try (CloseableIterable reader = IcebergGenerics.read(table).select(columns).build()) { + try (CloseableIterable reader = IcebergGenerics + .read(table) + .useSnapshot(snapshotId) + .select(columns) + .build()) { reader.forEach(set::add); } return set; diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index 0457a958837e..2cf88d66b774 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -29,6 +30,7 @@ import org.apache.flink.table.api.TableColumn; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.TableNotExistException; @@ -42,10 +44,14 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PropertyUtil; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -158,6 +164,31 @@ public void testCreatePartitionTable() throws TableNotExistException { Assert.assertEquals(Collections.singletonList("dt"), catalogTable.getPartitionKeys()); } + @Test + public void testCreateTableWithPrimaryKey() throws Exception { + sql("CREATE TABLE tl(id BIGINT, dt STRING, PRIMARY KEY(id, dt) NOT ENFORCED)"); + + Table table = table("tl"); + Assert.assertEquals(new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "dt", Types.StringType.get()) + ).asStruct(), table.schema().asStruct()); + Assert.assertEquals("Should have the expected primary keys", "id,dt", + PropertyUtil.propertyAsString(table.properties(), + TableProperties.EQUALITY_FIELD_COLUMNS, + TableProperties.DEFAULT_EQUALITY_FIELD_COLUMNS)); + + CatalogTable catalogTable = catalogTable("tl"); + Assert.assertTrue(catalogTable.getSchema().getTableColumn("id").isPresent()); + Assert.assertTrue(catalogTable.getSchema().getTableColumn("dt").isPresent()); + Assert.assertEquals(ImmutableMap.of(TableProperties.EQUALITY_FIELD_COLUMNS, "id,dt"), catalogTable.getOptions()); + + Optional constraint = catalogTable.getSchema().getPrimaryKey(); + Assert.assertTrue("Should have an unique constraint.", constraint.isPresent()); + Assert.assertEquals("Should have the expected primary key columns", ImmutableList.of("id", "dt"), + constraint.get().getColumns()); + } + @Test public void testLoadTransformPartitionTable() throws TableNotExistException { Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.LongType.get())); diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java index f27f54cf85b4..ab1133985c45 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java @@ -20,12 +20,8 @@ package org.apache.iceberg.flink; import java.util.List; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Expressions; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; @@ -71,19 +67,7 @@ public TestFlinkTableSink(String catalogName, String[] baseNamespace, FileFormat protected TableEnvironment getTableEnv() { if (tEnv == null) { synchronized (this) { - EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings - .newInstance() - .useBlinkPlanner(); - if (isStreamingJob) { - settingsBuilder.inStreamingMode(); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.enableCheckpointing(400); - tEnv = StreamTableEnvironment.create(env, settingsBuilder.build()); - } else { - settingsBuilder.inBatchMode(); - tEnv = TableEnvironment.create(settingsBuilder.build()); - } + tEnv = createTableEnv(isStreamingJob); } } return tEnv; diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkV2.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkV2.java new file mode 100644 index 000000000000..6b42768b16bf --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkV2.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.source.BoundedTestSource; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.StructLikeSet; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestFlinkTableSinkV2 extends FlinkCatalogTestBase { + private static final String TABLE_NAME = "test_table"; + private static final int ROW_ID_POS = 0; + private static final int ROW_DATA_POS = 1; + + private final boolean partitioned; + + private final StreamExecutionEnvironment env; + + private StreamTableEnvironment tEnv; + private Map tableProps; + + @Parameterized.Parameters(name = "CatalogName={0}, Format={1}, Partitioned={2}") + public static Iterable parameters() { + return ImmutableList.of( + new Object[] {"testhive", "avro", true}, + new Object[] {"testhive", "avro", false}, + new Object[] {"testhive", "parquet", true}, + new Object[] {"testhive", "parquet", false}, + new Object[] {"testhadoop", "avro", true}, + new Object[] {"testhadoop", "avro", false}, + new Object[] {"testhadoop", "parquet", true}, + new Object[] {"testhadoop", "parquet", false} + ); + } + + public TestFlinkTableSinkV2(String catalogName, String format, Boolean partitioned) { + super(catalogName, new String[0]); + this.partitioned = partitioned; + + this.env = StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(400); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + this.tableProps = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format); + } + + @Override + protected TableEnvironment getTableEnv() { + if (tEnv == null) { + synchronized (this) { + if (tEnv == null) { + tEnv = StreamTableEnvironment.create(env, EnvironmentSettings + .newInstance() + .useBlinkPlanner() + .inStreamingMode() + .build()); + } + } + } + return tEnv; + } + + @Before + public void before() { + super.before(); + sql("CREATE DATABASE %s", flinkDatabase); + sql("USE CATALOG %s", catalogName); + sql("USE %s", DATABASE); + } + + @After + public void clean() { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME); + sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + super.clean(); + } + + @Test + public void testSqlChangeLogOnIdKey() throws Exception { + List> inputRowsPerCheckpoint = ImmutableList.of( + ImmutableList.of( + row("+I", 1, "aaa"), + row("-D", 1, "aaa"), + row("+I", 1, "bbb"), + row("+I", 2, "aaa"), + row("-D", 2, "aaa"), + row("+I", 2, "bbb") + ), + ImmutableList.of( + row("-U", 2, "bbb"), + row("+U", 2, "ccc"), + row("-D", 2, "ccc"), + row("+I", 2, "ddd") + ), + ImmutableList.of( + row("-D", 1, "bbb"), + row("+I", 1, "ccc"), + row("-D", 1, "ccc"), + row("+I", 1, "ddd") + ) + ); + + List> expectedRecordsPerCheckpoint = ImmutableList.of( + ImmutableList.of(record(1, "bbb"), record(2, "bbb")), + ImmutableList.of(record(1, "bbb"), record(2, "ddd")), + ImmutableList.of(record(1, "ddd"), record(2, "ddd")) + ); + + testSqlChangeLog(ImmutableList.of("id"), row -> Row.of(row.getField(ROW_ID_POS)), inputRowsPerCheckpoint, + expectedRecordsPerCheckpoint); + } + + @Test + public void testChangeLogOnDataKey() throws Exception { + List> elementsPerCheckpoint = ImmutableList.of( + ImmutableList.of( + row("+I", 1, "aaa"), + row("-D", 1, "aaa"), + row("+I", 2, "bbb"), + row("+I", 1, "bbb"), + row("+I", 2, "aaa") + ), + ImmutableList.of( + row("-U", 2, "aaa"), + row("+U", 1, "ccc"), + row("+I", 1, "aaa") + ), + ImmutableList.of( + row("-D", 1, "bbb"), + row("+I", 2, "aaa"), + row("+I", 2, "ccc") + ) + ); + + List> expectedRecords = ImmutableList.of( + ImmutableList.of(record(1, "bbb"), record(2, "aaa")), + ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc")), + ImmutableList.of(record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "ccc")) + ); + + testSqlChangeLog(ImmutableList.of("data"), row -> Row.of(row.getField(ROW_DATA_POS)), elementsPerCheckpoint, + expectedRecords); + } + + @Test + public void testChangeLogOnIdDataKey() throws Exception { + List> elementsPerCheckpoint = ImmutableList.of( + ImmutableList.of( + row("+I", 1, "aaa"), + row("-D", 1, "aaa"), + row("+I", 2, "bbb"), + row("+I", 1, "bbb"), + row("+I", 2, "aaa") + ), + ImmutableList.of( + row("-U", 2, "aaa"), + row("+U", 1, "ccc"), + row("+I", 1, "aaa") + ), + ImmutableList.of( + row("-D", 1, "bbb"), + row("+I", 2, "aaa") + ) + ); + + List> expectedRecords = ImmutableList.of( + ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")), + ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")), + ImmutableList.of(record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb")) + ); + + testSqlChangeLog(ImmutableList.of("data", "id"), + row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), + elementsPerCheckpoint, expectedRecords); + } + + @Test + public void testPureInsertOnIdKey() throws Exception { + List> elementsPerCheckpoint = ImmutableList.of( + ImmutableList.of( + row("+I", 1, "aaa"), + row("+I", 2, "bbb") + ), + ImmutableList.of( + row("+I", 3, "ccc"), + row("+I", 4, "ddd") + ), + ImmutableList.of( + row("+I", 5, "eee"), + row("+I", 6, "fff") + ) + ); + + List> expectedRecords = ImmutableList.of( + ImmutableList.of( + record(1, "aaa"), + record(2, "bbb") + ), + ImmutableList.of( + record(1, "aaa"), + record(2, "bbb"), + record(3, "ccc"), + record(4, "ddd") + ), + ImmutableList.of( + record(1, "aaa"), + record(2, "bbb"), + record(3, "ccc"), + record(4, "ddd"), + record(5, "eee"), + record(6, "fff") + ) + ); + + testSqlChangeLog(ImmutableList.of("data"), row -> Row.of(row.getField(ROW_DATA_POS)), elementsPerCheckpoint, + expectedRecords); + } + + private static Row row(String rowKind, int id, String data) { + return SimpleDataUtil.createRow(rowKind, id, data); + } + + private Record record(int id, String data) { + return SimpleDataUtil.createRecord(id, data); + } + + private Table createTable(List equalityColumns, boolean isPartitioned) { + String partitionByCause = isPartitioned ? "PARTITIONED BY (data)" : ""; + sql("CREATE TABLE %s (id INT, data VARCHAR, PRIMARY KEY(%s) NOT ENFORCED) %s WITH %s", + TABLE_NAME, + Joiner.on(',').join(equalityColumns), + partitionByCause, + toWithClause(tableProps)); + + // Upgrade the iceberg table to format v2. + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + ops.commit(meta, meta.upgradeToFormatVersion(2)); + + return table; + } + + private void testSqlChangeLog(List equalityColumns, + KeySelector keySelector, + List> inputRowsPerCheckpoint, + List> expectedRecordsPerCheckpoint) throws Exception { + Table table = createTable(equalityColumns, partitioned); + + TableSchema flinkSchema = TableSchema.builder() + .field("id", DataTypes.INT().notNull()) + .field("data", DataTypes.STRING().notNull()) + .primaryKey(equalityColumns.toArray(new String[0])) + .build(); + RowType rowType = (RowType) flinkSchema.toRowDataType().getLogicalType(); + DataFormatConverters.RowConverter mapper = new DataFormatConverters.RowConverter(flinkSchema.getFieldDataTypes()); + + DataStream inputStream = + env.addSource(new BoundedTestSource<>(inputRowsPerCheckpoint), new RowTypeInfo(flinkSchema.getFieldTypes())) + .keyBy(keySelector) // Shuffle by key so that different version of same key could be in the correct order. + .map(mapper::toInternal, RowDataTypeInfo.of(rowType)); + + tEnv.createTemporaryView("source_change_logs", tEnv.fromDataStream(inputStream)); + + sql("INSERT INTO %s SELECT * FROM source_change_logs", TABLE_NAME); + + table.refresh(); + List snapshots = findValidSnapshots(table); + int expectedSnapshotNum = expectedRecordsPerCheckpoint.size(); + Assert.assertEquals("Should have the expected snapshot number", expectedSnapshotNum, snapshots.size()); + + for (int i = 0; i < expectedSnapshotNum; i++) { + long snapshotId = snapshots.get(i).snapshotId(); + List expectedRecords = expectedRecordsPerCheckpoint.get(i); + Assert.assertEquals("Should have the expected records for the checkpoint#" + i, + expectedRowSet(table, expectedRecords), actualRowSet(table, snapshotId, "*")); + } + } + + private List findValidSnapshots(Table table) { + List validSnapshots = Lists.newArrayList(); + for (Snapshot snapshot : table.snapshots()) { + if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) { + validSnapshots.add(snapshot); + } + } + return validSnapshots; + } + + private static StructLikeSet expectedRowSet(Table table, List records) { + return SimpleDataUtil.expectedRowSet(table, records.toArray(new Record[0])); + } + + private static StructLikeSet actualRowSet(Table table, long snapshotId, String... columns) throws IOException { + return SimpleDataUtil.actualRowSet(table, snapshotId, columns); + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java index 93222ddc4535..53657f8a5d2e 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java @@ -23,28 +23,23 @@ import java.io.IOException; import java.util.List; import java.util.Locale; -import java.util.Map; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; -import org.apache.flink.types.RowKind; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableTestBase; -import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TestTableLoader; import org.apache.iceberg.flink.source.BoundedTestSource; -import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.StructLikeSet; import org.junit.Assert; @@ -59,12 +54,6 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase { private static final TypeInformation ROW_TYPE_INFO = new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); - private static final Map ROW_KIND_MAP = ImmutableMap.of( - "+I", RowKind.INSERT, - "-D", RowKind.DELETE, - "-U", RowKind.UPDATE_BEFORE, - "+U", RowKind.UPDATE_AFTER); - private static final int ROW_ID_POS = 0; private static final int ROW_DATA_POS = 1; @@ -164,12 +153,7 @@ private void testChangeLogs(List equalityFieldColumns, } private Row row(String rowKind, int id, String data) { - RowKind kind = ROW_KIND_MAP.get(rowKind); - if (kind == null) { - throw new IllegalArgumentException("Unknown row kind: " + rowKind); - } - - return Row.ofKind(kind, id, data); + return SimpleDataUtil.createRow(rowKind, id, data); } private Record record(int id, String data) { @@ -315,14 +299,6 @@ private StructLikeSet expectedRowSet(Record... records) { } private StructLikeSet actualRowSet(long snapshotId, String... columns) throws IOException { - table.refresh(); - StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); - try (CloseableIterable reader = IcebergGenerics.read(table) - .useSnapshot(snapshotId) - .select(columns) - .build()) { - reader.forEach(set::add); - } - return set; + return SimpleDataUtil.actualRowSet(table, snapshotId, columns); } }