diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index 448b2aa2d8ef..4b5c7e4a0dd5 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -172,4 +172,12 @@ public int workerPoolSize() { .defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue()) .parse(); } + + public String branch() { + return confParser + .stringConf() + .option(FlinkWriteOptions.BRANCH.key()) + .defaultValue(FlinkWriteOptions.BRANCH.defaultValue()) + .parse(); + } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java index f3cc52972bfe..86cb2fb0eb58 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.iceberg.SnapshotRef; /** Flink sink write options */ public class FlinkWriteOptions { @@ -56,4 +57,8 @@ private FlinkWriteOptions() {} // Overrides the table's write.distribution-mode public static final ConfigOption DISTRIBUTION_MODE = ConfigOptions.key("distribution-mode").stringType().noDefaultValue(); + + // Branch to write to + public static final ConfigOption BRANCH = + ConfigOptions.key("branch").stringType().defaultValue(SnapshotRef.MAIN_BRANCH); } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 81706e582413..445b6a6ff97a 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -316,6 +316,11 @@ public Builder setSnapshotProperty(String property, String value) { return this; } + public Builder toBranch(String branch) { + writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch); + return this; + } + private DataStreamSink chainIcebergOperators() { Preconditions.checkArgument( inputCreator != null, @@ -422,7 +427,8 @@ private SingleOutputStreamOperator appendCommitter( tableLoader, flinkWriteConf.overwriteMode(), snapshotProperties, - flinkWriteConf.workerPoolSize()); + flinkWriteConf.workerPoolSize(), + flinkWriteConf.branch()); SingleOutputStreamOperator committerStream = writerStream .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index b686a76c989f..4a7d4857c57a 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -95,6 +95,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator // The completed files cache for current checkpoint. Once the snapshot barrier received, it will // be flushed to the 'dataFilesPerCheckpoint'. private final List writeResultsOfCurrentCkpt = Lists.newArrayList(); + private final String branch; // It will have an unique identifier for one job. private transient String flinkJobId; @@ -125,11 +126,13 @@ class IcebergFilesCommitter extends AbstractStreamOperator TableLoader tableLoader, boolean replacePartitions, Map snapshotProperties, - Integer workerPoolSize) { + Integer workerPoolSize, + String branch) { this.tableLoader = tableLoader; this.replacePartitions = replacePartitions; this.snapshotProperties = snapshotProperties; this.workerPoolSize = workerPoolSize; + this.branch = branch; } @Override @@ -179,7 +182,7 @@ public void initializeState(StateInitializationContext context) throws Exception // it's safe to assign the max committed checkpoint id from restored flink job to the current // flink job. this.maxCommittedCheckpointId = - getMaxCommittedCheckpointId(table, restoredFlinkJobId, operatorUniqueId); + getMaxCommittedCheckpointId(table, restoredFlinkJobId, operatorUniqueId, branch); NavigableMap uncommittedDataFiles = Maps.newTreeMap(checkpointsState.get().iterator().next()) @@ -383,10 +386,11 @@ private void commitOperation( String operatorId, long checkpointId) { LOG.info( - "Committing {} for checkpoint {} to table {} with summary: {}", + "Committing {} for checkpoint {} to table {} branch {} with summary: {}", description, checkpointId, table.name(), + branch, summary); snapshotProperties.forEach(operation::set); // custom snapshot metadata properties will be overridden if they conflict with internal ones @@ -394,6 +398,7 @@ private void commitOperation( operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId)); operation.set(FLINK_JOB_ID, newFlinkJobId); operation.set(OPERATOR_ID, operatorId); + operation.toBranch(branch); long startNano = System.nanoTime(); operation.commit(); // abort is automatically called if this fails. @@ -471,8 +476,9 @@ private static ListStateDescriptor> buildStateDescriptor return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo); } - static long getMaxCommittedCheckpointId(Table table, String flinkJobId, String operatorId) { - Snapshot snapshot = table.currentSnapshot(); + static long getMaxCommittedCheckpointId( + Table table, String flinkJobId, String operatorId, String branch) { + Snapshot snapshot = table.snapshot(branch); long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID; while (snapshot != null) { diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index e29676350855..345d88a48a3b 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -43,6 +43,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.data.GenericRecord; @@ -206,12 +207,18 @@ private static List convertToRecords(List rows) { return records; } - public static void assertTableRows(String tablePath, List expected) throws IOException { - assertTableRecords(tablePath, convertToRecords(expected)); + public static void assertTableRows(String tablePath, List expected, String branch) + throws IOException { + assertTableRecords(tablePath, convertToRecords(expected), branch); } public static void assertTableRows(Table table, List expected) throws IOException { - assertTableRecords(table, convertToRecords(expected)); + assertTableRecords(table, convertToRecords(expected), SnapshotRef.MAIN_BRANCH); + } + + public static void assertTableRows(Table table, List expected, String branch) + throws IOException { + assertTableRecords(table, convertToRecords(expected), branch); } /** Get all rows for a table */ @@ -267,13 +274,25 @@ public static void assertTableRecords( } public static void assertTableRecords(Table table, List expected) throws IOException { + assertTableRecords(table, expected, SnapshotRef.MAIN_BRANCH); + } + + public static void assertTableRecords(Table table, List expected, String branch) + throws IOException { table.refresh(); + Snapshot snapshot = latestSnapshot(table, branch); + + if (snapshot == null) { + Assert.assertEquals(expected, ImmutableList.of()); + return; + } Types.StructType type = table.schema().asStruct(); StructLikeSet expectedSet = StructLikeSet.create(type); expectedSet.addAll(expected); - try (CloseableIterable iterable = IcebergGenerics.read(table).build()) { + try (CloseableIterable iterable = + IcebergGenerics.read(table).useSnapshot(snapshot.snapshotId()).build()) { StructLikeSet actualSet = StructLikeSet.create(type); for (Record record : iterable) { @@ -284,10 +303,27 @@ public static void assertTableRecords(Table table, List expected) throws } } + // Returns the latest snapshot of the given branch in the table + public static Snapshot latestSnapshot(Table table, String branch) { + // For the main branch, currentSnapshot() is used to validate that the API behavior has + // not changed since that was the API used for validation prior to addition of branches. + if (branch.equals(SnapshotRef.MAIN_BRANCH)) { + return table.currentSnapshot(); + } + + return table.snapshot(branch); + } + public static void assertTableRecords(String tablePath, List expected) throws IOException { Preconditions.checkArgument(expected != null, "expected records shouldn't be null"); - assertTableRecords(new HadoopTables().load(tablePath), expected); + assertTableRecords(new HadoopTables().load(tablePath), expected, SnapshotRef.MAIN_BRANCH); + } + + public static void assertTableRecords(String tablePath, List expected, String branch) + throws IOException { + Preconditions.checkArgument(expected != null, "expected records shouldn't be null"); + assertTableRecords(new HadoopTables().load(tablePath), expected, branch); } public static StructLikeSet expectedRowSet(Table table, Record... records) { diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java index c2af30d34271..23beb19a72f2 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java @@ -22,14 +22,10 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.util.DataFormatConverters; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; import org.apache.iceberg.AssertHelpers; @@ -45,7 +41,6 @@ import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.TestFixtures; -import org.apache.iceberg.flink.source.BoundedTestSource; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -60,7 +55,7 @@ import org.junit.runners.Parameterized; @RunWith(Parameterized.class) -public class TestFlinkIcebergSink { +public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase { @ClassRule public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = @@ -72,13 +67,6 @@ public class TestFlinkIcebergSink { public final HadoopCatalogResource catalogResource = new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE); - private static final TypeInformation ROW_TYPE_INFO = - new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); - private static final DataFormatConverters.RowConverter CONVERTER = - new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes()); - - private Table table; - private StreamExecutionEnvironment env; private TableLoader tableLoader; private final FileFormat format; @@ -132,14 +120,6 @@ public void before() throws IOException { tableLoader = catalogResource.tableLoader(); } - private List convertToRowData(List rows) { - return rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList()); - } - - private BoundedTestSource createBoundedSource(List rows) { - return new BoundedTestSource<>(rows.toArray(new Row[0])); - } - @Test public void testWriteRowData() throws Exception { List rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo")); @@ -160,19 +140,6 @@ public void testWriteRowData() throws Exception { SimpleDataUtil.assertTableRows(table, convertToRowData(rows)); } - private List createRows(String prefix) { - return Lists.newArrayList( - Row.of(1, prefix + "aaa"), - Row.of(1, prefix + "bbb"), - Row.of(1, prefix + "ccc"), - Row.of(2, prefix + "aaa"), - Row.of(2, prefix + "bbb"), - Row.of(2, prefix + "ccc"), - Row.of(3, prefix + "aaa"), - Row.of(3, prefix + "bbb"), - Row.of(3, prefix + "ccc")); - } - private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode) throws Exception { List rows = createRows(""); diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java new file mode 100644 index 000000000000..b38aa6b50ce6 --- /dev/null +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.types.Row; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.source.BoundedTestSource; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class TestFlinkIcebergSinkBase { + + protected Table table; + protected StreamExecutionEnvironment env; + protected static final TypeInformation ROW_TYPE_INFO = + new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); + + protected static final DataFormatConverters.RowConverter CONVERTER = + new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes()); + + protected BoundedTestSource createBoundedSource(List rows) { + return new BoundedTestSource<>(rows.toArray(new Row[0])); + } + + protected List createRows(String prefix) { + return Lists.newArrayList( + Row.of(1, prefix + "aaa"), + Row.of(1, prefix + "bbb"), + Row.of(1, prefix + "ccc"), + Row.of(2, prefix + "aaa"), + Row.of(2, prefix + "bbb"), + Row.of(2, prefix + "ccc"), + Row.of(3, prefix + "aaa"), + Row.of(3, prefix + "bbb"), + Row.of(3, prefix + "ccc")); + } + + protected List convertToRowData(List rows) { + return rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList()); + } +} diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java new file mode 100644 index 000000000000..16b4542b00d3 --- /dev/null +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.io.IOException; +import java.util.List; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.types.Row; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.flink.HadoopCatalogResource; +import org.apache.iceberg.flink.MiniClusterResource; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase { + + @ClassRule + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = + MiniClusterResource.createWithClassloaderCheckDisabled(); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final HadoopCatalogResource catalogResource = + new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE); + + private final String branch; + private TableLoader tableLoader; + + @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}") + public static Object[] parameters() { + return new Object[] {"main", "testBranch"}; + } + + public TestFlinkIcebergSinkBranch(String branch) { + this.branch = branch; + } + + @Before + public void before() throws IOException { + table = + catalogResource + .catalog() + .createTable( + TestFixtures.TABLE_IDENTIFIER, + SimpleDataUtil.SCHEMA, + PartitionSpec.unpartitioned(), + ImmutableMap.of( + TableProperties.DEFAULT_FILE_FORMAT, + FileFormat.AVRO.name(), + TableProperties.FORMAT_VERSION, + "1")); + + env = + StreamExecutionEnvironment.getExecutionEnvironment( + MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG) + .enableCheckpointing(100); + + tableLoader = catalogResource.tableLoader(); + } + + @Test + public void testWriteRowWithTableSchema() throws Exception { + testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE); + verifyOtherBranchUnmodified(); + } + + private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode) + throws Exception { + List rows = createRows(""); + DataStream dataStream = env.addSource(createBoundedSource(rows), ROW_TYPE_INFO); + + FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) + .table(table) + .tableLoader(tableLoader) + .tableSchema(tableSchema) + .toBranch(branch) + .distributionMode(distributionMode) + .append(); + + // Execute the program. + env.execute("Test Iceberg DataStream."); + + SimpleDataUtil.assertTableRows(table, convertToRowData(rows), branch); + SimpleDataUtil.assertTableRows( + table, + ImmutableList.of(), + branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH); + + verifyOtherBranchUnmodified(); + } + + private void verifyOtherBranchUnmodified() { + String otherBranch = + branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH; + if (otherBranch.equals(SnapshotRef.MAIN_BRANCH)) { + Assert.assertNull(table.currentSnapshot()); + } + + Assert.assertTrue(table.snapshot(otherBranch) == null); + } +} diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java index 916c337bee6f..7733eeb4a31f 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java @@ -18,38 +18,25 @@ */ package org.apache.iceberg.flink.sink; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; -import org.apache.flink.types.RowKind; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.Table; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.data.IcebergGenerics; -import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.HadoopCatalogResource; import org.apache.iceberg.flink.MiniClusterResource; import org.apache.iceberg.flink.SimpleDataUtil; -import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.source.BoundedTestSource; -import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.StructLikeSet; import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; @@ -60,7 +47,7 @@ import org.junit.runners.Parameterized; @RunWith(Parameterized.class) -public class TestFlinkIcebergSinkV2 { +public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkV2Base { @ClassRule public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = @@ -72,29 +59,6 @@ public class TestFlinkIcebergSinkV2 { public final HadoopCatalogResource catalogResource = new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE); - private static final int FORMAT_V2 = 2; - private static final TypeInformation ROW_TYPE_INFO = - new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); - - private static final Map ROW_KIND_MAP = - ImmutableMap.of( - "+I", RowKind.INSERT, - "-D", RowKind.DELETE, - "-U", RowKind.UPDATE_BEFORE, - "+U", RowKind.UPDATE_AFTER); - - private static final int ROW_ID_POS = 0; - private static final int ROW_DATA_POS = 1; - - private final FileFormat format; - private final int parallelism; - private final boolean partitioned; - private final String writeDistributionMode; - - private Table table; - private StreamExecutionEnvironment env; - private TableLoader tableLoader; - @Parameterized.Parameters( name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, WriteDistributionMode ={3}") public static Object[][] parameters() { @@ -155,67 +119,6 @@ public void setupTable() { tableLoader = catalogResource.tableLoader(); } - private List findValidSnapshots() { - List validSnapshots = Lists.newArrayList(); - for (Snapshot snapshot : table.snapshots()) { - if (snapshot.allManifests(table.io()).stream() - .anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) { - validSnapshots.add(snapshot); - } - } - return validSnapshots; - } - - private void testChangeLogs( - List equalityFieldColumns, - KeySelector keySelector, - boolean insertAsUpsert, - List> elementsPerCheckpoint, - List> expectedRecordsPerCheckpoint) - throws Exception { - DataStream dataStream = - env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO); - - FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) - .tableLoader(tableLoader) - .tableSchema(SimpleDataUtil.FLINK_SCHEMA) - .writeParallelism(parallelism) - .equalityFieldColumns(equalityFieldColumns) - .upsert(insertAsUpsert) - .append(); - - // Execute the program. - env.execute("Test Iceberg Change-Log DataStream."); - - table.refresh(); - List snapshots = findValidSnapshots(); - int expectedSnapshotNum = expectedRecordsPerCheckpoint.size(); - Assert.assertEquals( - "Should have the expected snapshot number", expectedSnapshotNum, snapshots.size()); - - for (int i = 0; i < expectedSnapshotNum; i++) { - long snapshotId = snapshots.get(i).snapshotId(); - List expectedRecords = expectedRecordsPerCheckpoint.get(i); - Assert.assertEquals( - "Should have the expected records for the checkpoint#" + i, - expectedRowSet(expectedRecords.toArray(new Record[0])), - actualRowSet(snapshotId, "*")); - } - } - - private Row row(String rowKind, int id, String data) { - RowKind kind = ROW_KIND_MAP.get(rowKind); - if (kind == null) { - throw new IllegalArgumentException("Unknown row kind: " + rowKind); - } - - return Row.ofKind(kind, id, data); - } - - private Record record(int id, String data) { - return SimpleDataUtil.createRecord(id, data); - } - @Test public void testCheckAndGetEqualityFieldIds() { table @@ -249,136 +152,22 @@ public void testCheckAndGetEqualityFieldIds() { @Test public void testChangeLogOnIdKey() throws Exception { - List> elementsPerCheckpoint = - ImmutableList.of( - ImmutableList.of( - row("+I", 1, "aaa"), - row("-D", 1, "aaa"), - row("+I", 1, "bbb"), - row("+I", 2, "aaa"), - row("-D", 2, "aaa"), - row("+I", 2, "bbb")), - ImmutableList.of( - row("-U", 2, "bbb"), row("+U", 2, "ccc"), row("-D", 2, "ccc"), row("+I", 2, "ddd")), - ImmutableList.of( - row("-D", 1, "bbb"), - row("+I", 1, "ccc"), - row("-D", 1, "ccc"), - row("+I", 1, "ddd"))); - - List> expectedRecords = - ImmutableList.of( - ImmutableList.of(record(1, "bbb"), record(2, "bbb")), - ImmutableList.of(record(1, "bbb"), record(2, "ddd")), - ImmutableList.of(record(1, "ddd"), record(2, "ddd"))); - - if (partitioned && writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) { - AssertHelpers.assertThrows( - "Should be error because equality field columns don't include all partition keys", - IllegalStateException.class, - "should be included in equality fields", - () -> { - testChangeLogs( - ImmutableList.of("id"), - row -> row.getField(ROW_ID_POS), - false, - elementsPerCheckpoint, - expectedRecords); - return null; - }); - } else { - testChangeLogs( - ImmutableList.of("id"), - row -> row.getField(ROW_ID_POS), - false, - elementsPerCheckpoint, - expectedRecords); - } + testChangeLogOnIdKey(SnapshotRef.MAIN_BRANCH); } @Test public void testChangeLogOnDataKey() throws Exception { - List> elementsPerCheckpoint = - ImmutableList.of( - ImmutableList.of( - row("+I", 1, "aaa"), - row("-D", 1, "aaa"), - row("+I", 2, "bbb"), - row("+I", 1, "bbb"), - row("+I", 2, "aaa")), - ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")), - ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa"), row("+I", 2, "ccc"))); - - List> expectedRecords = - ImmutableList.of( - ImmutableList.of(record(1, "bbb"), record(2, "aaa")), - ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc")), - ImmutableList.of( - record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "ccc"))); - - testChangeLogs( - ImmutableList.of("data"), - row -> row.getField(ROW_DATA_POS), - false, - elementsPerCheckpoint, - expectedRecords); + testChangeLogOnDataKey(SnapshotRef.MAIN_BRANCH); } @Test public void testChangeLogOnIdDataKey() throws Exception { - List> elementsPerCheckpoint = - ImmutableList.of( - ImmutableList.of( - row("+I", 1, "aaa"), - row("-D", 1, "aaa"), - row("+I", 2, "bbb"), - row("+I", 1, "bbb"), - row("+I", 2, "aaa")), - ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")), - ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa"))); - - List> expectedRecords = - ImmutableList.of( - ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")), - ImmutableList.of( - record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")), - ImmutableList.of( - record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb"))); - - testChangeLogs( - ImmutableList.of("data", "id"), - row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), - false, - elementsPerCheckpoint, - expectedRecords); + testChangeLogOnIdDataKey(SnapshotRef.MAIN_BRANCH); } @Test public void testChangeLogOnSameKey() throws Exception { - List> elementsPerCheckpoint = - ImmutableList.of( - // Checkpoint #1 - ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa")), - // Checkpoint #2 - ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa")), - // Checkpoint #3 - ImmutableList.of(row("-D", 1, "aaa"), row("+I", 1, "aaa")), - // Checkpoint #4 - ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 1, "aaa"))); - - List> expectedRecords = - ImmutableList.of( - ImmutableList.of(record(1, "aaa")), - ImmutableList.of(record(1, "aaa")), - ImmutableList.of(record(1, "aaa")), - ImmutableList.of(record(1, "aaa"), record(1, "aaa"))); - - testChangeLogs( - ImmutableList.of("id", "data"), - row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), - false, - elementsPerCheckpoint, - expectedRecords); + testChangeLogOnSameKey(SnapshotRef.MAIN_BRANCH); } @Test @@ -408,97 +197,16 @@ public void testUpsertModeCheck() throws Exception { @Test public void testUpsertOnIdKey() throws Exception { - List> elementsPerCheckpoint = - ImmutableList.of( - ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "bbb")), - ImmutableList.of(row("+I", 1, "ccc")), - ImmutableList.of(row("+U", 1, "ddd"), row("+I", 1, "eee"))); - - List> expectedRecords = - ImmutableList.of( - ImmutableList.of(record(1, "bbb")), - ImmutableList.of(record(1, "ccc")), - ImmutableList.of(record(1, "eee"))); - - if (!partitioned) { - testChangeLogs( - ImmutableList.of("id"), - row -> row.getField(ROW_ID_POS), - true, - elementsPerCheckpoint, - expectedRecords); - } else { - AssertHelpers.assertThrows( - "Should be error because equality field columns don't include all partition keys", - IllegalStateException.class, - "should be included in equality fields", - () -> { - testChangeLogs( - ImmutableList.of("id"), - row -> row.getField(ROW_ID_POS), - true, - elementsPerCheckpoint, - expectedRecords); - return null; - }); - } + testUpsertOnIdKey(SnapshotRef.MAIN_BRANCH); } @Test public void testUpsertOnDataKey() throws Exception { - List> elementsPerCheckpoint = - ImmutableList.of( - ImmutableList.of(row("+I", 1, "aaa"), row("+I", 2, "aaa"), row("+I", 3, "bbb")), - ImmutableList.of(row("+U", 4, "aaa"), row("-U", 3, "bbb"), row("+U", 5, "bbb")), - ImmutableList.of(row("+I", 6, "aaa"), row("+U", 7, "bbb"))); - - List> expectedRecords = - ImmutableList.of( - ImmutableList.of(record(2, "aaa"), record(3, "bbb")), - ImmutableList.of(record(4, "aaa"), record(5, "bbb")), - ImmutableList.of(record(6, "aaa"), record(7, "bbb"))); - - testChangeLogs( - ImmutableList.of("data"), - row -> row.getField(ROW_DATA_POS), - true, - elementsPerCheckpoint, - expectedRecords); + testUpsertOnDataKey(SnapshotRef.MAIN_BRANCH); } @Test public void testUpsertOnIdDataKey() throws Exception { - List> elementsPerCheckpoint = - ImmutableList.of( - ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 2, "bbb")), - ImmutableList.of(row("+I", 1, "aaa"), row("-D", 2, "bbb"), row("+I", 2, "ccc")), - ImmutableList.of(row("+U", 1, "bbb"), row("-U", 1, "ccc"), row("-D", 1, "aaa"))); - - List> expectedRecords = - ImmutableList.of( - ImmutableList.of(record(1, "aaa"), record(2, "bbb")), - ImmutableList.of(record(1, "aaa"), record(2, "ccc")), - ImmutableList.of(record(1, "bbb"), record(2, "ccc"))); - - testChangeLogs( - ImmutableList.of("id", "data"), - row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), - true, - elementsPerCheckpoint, - expectedRecords); - } - - private StructLikeSet expectedRowSet(Record... records) { - return SimpleDataUtil.expectedRowSet(table, records); - } - - private StructLikeSet actualRowSet(long snapshotId, String... columns) throws IOException { - table.refresh(); - StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); - try (CloseableIterable reader = - IcebergGenerics.read(table).useSnapshot(snapshotId).select(columns).build()) { - reader.forEach(set::add); - } - return set; + testUpsertOnIdDataKey(SnapshotRef.MAIN_BRANCH); } } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java new file mode 100644 index 000000000000..15380408e474 --- /dev/null +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.source.BoundedTestSource; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.StructLikeSet; +import org.junit.Assert; + +public class TestFlinkIcebergSinkV2Base { + + protected static final int FORMAT_V2 = 2; + protected static final TypeInformation ROW_TYPE_INFO = + new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); + + protected static final int ROW_ID_POS = 0; + protected static final int ROW_DATA_POS = 1; + + protected int parallelism = 1; + protected TableLoader tableLoader; + protected Table table; + protected StreamExecutionEnvironment env; + protected FileFormat format; + protected boolean partitioned; + protected String writeDistributionMode; + + protected static final Map ROW_KIND_MAP = + ImmutableMap.of( + "+I", RowKind.INSERT, + "-D", RowKind.DELETE, + "-U", RowKind.UPDATE_BEFORE, + "+U", RowKind.UPDATE_AFTER); + + protected Row row(String rowKind, int id, String data) { + RowKind kind = ROW_KIND_MAP.get(rowKind); + if (kind == null) { + throw new IllegalArgumentException("Unknown row kind: " + rowKind); + } + + return Row.ofKind(kind, id, data); + } + + protected void testUpsertOnIdDataKey(String branch) throws Exception { + List> elementsPerCheckpoint = + ImmutableList.of( + ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 2, "bbb")), + ImmutableList.of(row("+I", 1, "aaa"), row("-D", 2, "bbb"), row("+I", 2, "ccc")), + ImmutableList.of(row("+U", 1, "bbb"), row("-U", 1, "ccc"), row("-D", 1, "aaa"))); + + List> expectedRecords = + ImmutableList.of( + ImmutableList.of(record(1, "aaa"), record(2, "bbb")), + ImmutableList.of(record(1, "aaa"), record(2, "ccc")), + ImmutableList.of(record(1, "bbb"), record(2, "ccc"))); + testChangeLogs( + ImmutableList.of("id", "data"), + row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), + true, + elementsPerCheckpoint, + expectedRecords, + branch); + } + + protected void testChangeLogOnIdDataKey(String branch) throws Exception { + List> elementsPerCheckpoint = + ImmutableList.of( + ImmutableList.of( + row("+I", 1, "aaa"), + row("-D", 1, "aaa"), + row("+I", 2, "bbb"), + row("+I", 1, "bbb"), + row("+I", 2, "aaa")), + ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")), + ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa"))); + + List> expectedRecords = + ImmutableList.of( + ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")), + ImmutableList.of( + record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")), + ImmutableList.of( + record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb"))); + + testChangeLogs( + ImmutableList.of("data", "id"), + row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), + false, + elementsPerCheckpoint, + expectedRecords, + branch); + } + + protected void testChangeLogOnSameKey(String branch) throws Exception { + List> elementsPerCheckpoint = + ImmutableList.of( + // Checkpoint #1 + ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa")), + // Checkpoint #2 + ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa")), + // Checkpoint #3 + ImmutableList.of(row("-D", 1, "aaa"), row("+I", 1, "aaa")), + // Checkpoint #4 + ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 1, "aaa"))); + + List> expectedRecords = + ImmutableList.of( + ImmutableList.of(record(1, "aaa")), + ImmutableList.of(record(1, "aaa")), + ImmutableList.of(record(1, "aaa")), + ImmutableList.of(record(1, "aaa"), record(1, "aaa"))); + + testChangeLogs( + ImmutableList.of("id", "data"), + row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), + false, + elementsPerCheckpoint, + expectedRecords, + branch); + } + + protected void testChangeLogOnDataKey(String branch) throws Exception { + List> elementsPerCheckpoint = + ImmutableList.of( + ImmutableList.of( + row("+I", 1, "aaa"), + row("-D", 1, "aaa"), + row("+I", 2, "bbb"), + row("+I", 1, "bbb"), + row("+I", 2, "aaa")), + ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")), + ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa"), row("+I", 2, "ccc"))); + + List> expectedRecords = + ImmutableList.of( + ImmutableList.of(record(1, "bbb"), record(2, "aaa")), + ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc")), + ImmutableList.of( + record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "ccc"))); + + testChangeLogs( + ImmutableList.of("data"), + row -> row.getField(ROW_DATA_POS), + false, + elementsPerCheckpoint, + expectedRecords, + branch); + } + + protected void testUpsertOnDataKey(String branch) throws Exception { + List> elementsPerCheckpoint = + ImmutableList.of( + ImmutableList.of(row("+I", 1, "aaa"), row("+I", 2, "aaa"), row("+I", 3, "bbb")), + ImmutableList.of(row("+U", 4, "aaa"), row("-U", 3, "bbb"), row("+U", 5, "bbb")), + ImmutableList.of(row("+I", 6, "aaa"), row("+U", 7, "bbb"))); + + List> expectedRecords = + ImmutableList.of( + ImmutableList.of(record(2, "aaa"), record(3, "bbb")), + ImmutableList.of(record(4, "aaa"), record(5, "bbb")), + ImmutableList.of(record(6, "aaa"), record(7, "bbb"))); + + testChangeLogs( + ImmutableList.of("data"), + row -> row.getField(ROW_DATA_POS), + true, + elementsPerCheckpoint, + expectedRecords, + branch); + } + + protected void testChangeLogOnIdKey(String branch) throws Exception { + List> elementsPerCheckpoint = + ImmutableList.of( + ImmutableList.of( + row("+I", 1, "aaa"), + row("-D", 1, "aaa"), + row("+I", 1, "bbb"), + row("+I", 2, "aaa"), + row("-D", 2, "aaa"), + row("+I", 2, "bbb")), + ImmutableList.of( + row("-U", 2, "bbb"), row("+U", 2, "ccc"), row("-D", 2, "ccc"), row("+I", 2, "ddd")), + ImmutableList.of( + row("-D", 1, "bbb"), + row("+I", 1, "ccc"), + row("-D", 1, "ccc"), + row("+I", 1, "ddd"))); + + List> expectedRecords = + ImmutableList.of( + ImmutableList.of(record(1, "bbb"), record(2, "bbb")), + ImmutableList.of(record(1, "bbb"), record(2, "ddd")), + ImmutableList.of(record(1, "ddd"), record(2, "ddd"))); + + if (partitioned && writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) { + AssertHelpers.assertThrows( + "Should be error because equality field columns don't include all partition keys", + IllegalStateException.class, + "should be included in equality fields", + () -> { + testChangeLogs( + ImmutableList.of("id"), + row -> row.getField(ROW_ID_POS), + false, + elementsPerCheckpoint, + expectedRecords, + branch); + return null; + }); + } else { + testChangeLogs( + ImmutableList.of("id"), + row -> row.getField(ROW_ID_POS), + false, + elementsPerCheckpoint, + expectedRecords, + branch); + } + } + + protected void testUpsertOnIdKey(String branch) throws Exception { + List> elementsPerCheckpoint = + ImmutableList.of( + ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "bbb")), + ImmutableList.of(row("+I", 1, "ccc")), + ImmutableList.of(row("+U", 1, "ddd"), row("+I", 1, "eee"))); + + List> expectedRecords = + ImmutableList.of( + ImmutableList.of(record(1, "bbb")), + ImmutableList.of(record(1, "ccc")), + ImmutableList.of(record(1, "eee"))); + + if (!partitioned) { + testChangeLogs( + ImmutableList.of("id"), + row -> row.getField(ROW_ID_POS), + true, + elementsPerCheckpoint, + expectedRecords, + branch); + } else { + AssertHelpers.assertThrows( + "Should be error because equality field columns don't include all partition keys", + IllegalStateException.class, + "should be included in equality fields", + () -> { + testChangeLogs( + ImmutableList.of("id"), + row -> row.getField(ROW_ID_POS), + true, + elementsPerCheckpoint, + expectedRecords, + branch); + return null; + }); + } + } + + protected void testChangeLogs( + List equalityFieldColumns, + KeySelector keySelector, + boolean insertAsUpsert, + List> elementsPerCheckpoint, + List> expectedRecordsPerCheckpoint, + String branch) + throws Exception { + DataStream dataStream = + env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO); + + FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) + .tableLoader(tableLoader) + .tableSchema(SimpleDataUtil.FLINK_SCHEMA) + .writeParallelism(parallelism) + .equalityFieldColumns(equalityFieldColumns) + .upsert(insertAsUpsert) + .toBranch(branch) + .append(); + + // Execute the program. + env.execute("Test Iceberg Change-Log DataStream."); + + table.refresh(); + List snapshots = findValidSnapshots(); + int expectedSnapshotNum = expectedRecordsPerCheckpoint.size(); + Assert.assertEquals( + "Should have the expected snapshot number", expectedSnapshotNum, snapshots.size()); + + for (int i = 0; i < expectedSnapshotNum; i++) { + long snapshotId = snapshots.get(i).snapshotId(); + List expectedRecords = expectedRecordsPerCheckpoint.get(i); + Assert.assertEquals( + "Should have the expected records for the checkpoint#" + i, + expectedRowSet(expectedRecords.toArray(new Record[0])), + actualRowSet(snapshotId, "*")); + } + } + + protected Record record(int id, String data) { + return SimpleDataUtil.createRecord(id, data); + } + + private List findValidSnapshots() { + List validSnapshots = Lists.newArrayList(); + for (Snapshot snapshot : table.snapshots()) { + if (snapshot.allManifests(table.io()).stream() + .anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) { + validSnapshots.add(snapshot); + } + } + return validSnapshots; + } + + private StructLikeSet expectedRowSet(Record... records) { + return SimpleDataUtil.expectedRowSet(table, records); + } + + private StructLikeSet actualRowSet(long snapshotId, String... columns) throws IOException { + table.refresh(); + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + try (CloseableIterable reader = + IcebergGenerics.read(table).useSnapshot(snapshotId).select(columns).build()) { + reader.forEach(set::add); + } + return set; + } +} diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java new file mode 100644 index 000000000000..fed333848279 --- /dev/null +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.io.IOException; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.flink.HadoopCatalogResource; +import org.apache.iceberg.flink.MiniClusterResource; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestFlinkIcebergSinkV2Branch extends TestFlinkIcebergSinkV2Base { + + @ClassRule + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = + MiniClusterResource.createWithClassloaderCheckDisabled(); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final HadoopCatalogResource catalogResource = + new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE); + + private final String branch; + + @Parameterized.Parameters(name = "branch = {0}") + public static Object[] parameters() { + return new Object[] {"main", "testBranch"}; + } + + public TestFlinkIcebergSinkV2Branch(String branch) { + this.branch = branch; + } + + @Before + public void before() throws IOException { + table = + catalogResource + .catalog() + .createTable( + TestFixtures.TABLE_IDENTIFIER, + SimpleDataUtil.SCHEMA, + PartitionSpec.unpartitioned(), + ImmutableMap.of( + TableProperties.DEFAULT_FILE_FORMAT, + FileFormat.AVRO.name(), + TableProperties.FORMAT_VERSION, + "2")); + + env = + StreamExecutionEnvironment.getExecutionEnvironment( + MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG) + .enableCheckpointing(100); + + tableLoader = catalogResource.tableLoader(); + } + + @Test + public void testChangeLogOnIdKey() throws Exception { + testChangeLogOnIdKey(branch); + verifyOtherBranchUnmodified(); + } + + @Test + public void testChangeLogOnDataKey() throws Exception { + testChangeLogOnDataKey(branch); + verifyOtherBranchUnmodified(); + } + + @Test + public void testChangeLogOnIdDataKey() throws Exception { + testChangeLogOnIdDataKey(branch); + verifyOtherBranchUnmodified(); + } + + @Test + public void testUpsertOnIdKey() throws Exception { + testUpsertOnIdKey(branch); + verifyOtherBranchUnmodified(); + } + + @Test + public void testUpsertOnDataKey() throws Exception { + testUpsertOnDataKey(branch); + verifyOtherBranchUnmodified(); + } + + @Test + public void testUpsertOnIdDataKey() throws Exception { + testUpsertOnIdDataKey(branch); + verifyOtherBranchUnmodified(); + } + + private void verifyOtherBranchUnmodified() { + String otherBranch = + branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH; + if (otherBranch.equals(SnapshotRef.MAIN_BRANCH)) { + Assert.assertNull(table.currentSnapshot()); + } + + Assert.assertTrue(table.snapshot(otherBranch) == null); + } +} diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 66baaeb0e998..a4f29d47f491 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -76,22 +76,24 @@ public class TestIcebergFilesCommitter extends TableTestBase { private File flinkManifestFolder; private final FileFormat format; + private final String branch; - @Parameterized.Parameters(name = "FileFormat = {0}, FormatVersion={1}") + @Parameterized.Parameters(name = "FileFormat = {0}, FormatVersion = {1}, branch = {2}") public static Object[][] parameters() { return new Object[][] { - new Object[] {"avro", 1}, - new Object[] {"avro", 2}, - new Object[] {"parquet", 1}, - new Object[] {"parquet", 2}, - new Object[] {"orc", 1}, - new Object[] {"orc", 2} + new Object[] {"avro", 1, "main"}, + new Object[] {"avro", 2, "test-branch"}, + new Object[] {"parquet", 1, "main"}, + new Object[] {"parquet", 2, "test-branch"}, + new Object[] {"orc", 1, "main"}, + new Object[] {"orc", 2, "test-branch"} }; } - public TestIcebergFilesCommitter(String format, int formatVersion) { + public TestIcebergFilesCommitter(String format, int formatVersion, String branch) { super(formatVersion); this.format = FileFormat.fromString(format); + this.branch = branch; } @Override @@ -125,7 +127,7 @@ public void testCommitTxnWithoutDataFiles() throws Exception { harness.open(); operatorId = harness.getOperator().getOperatorID(); - SimpleDataUtil.assertTableRows(table, Lists.newArrayList()); + SimpleDataUtil.assertTableRows(table, Lists.newArrayList(), branch); assertSnapshotSize(0); assertMaxCommittedCheckpointId(jobId, operatorId, -1L); @@ -204,12 +206,12 @@ public void testCommitTxn() throws Exception { harness.notifyOfCompletedCheckpoint(i); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows)); + SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows), branch); assertSnapshotSize(i); assertMaxCommittedCheckpointId(jobID, operatorId, i); Assert.assertEquals( TestIcebergFilesCommitter.class.getName(), - table.currentSnapshot().summary().get("flink.test")); + SimpleDataUtil.latestSnapshot(table, branch).summary().get("flink.test")); } } } @@ -255,13 +257,13 @@ public void testOrderedEventsBetweenCheckpoints() throws Exception { // 3. notifyCheckpointComplete for checkpoint#1 harness.notifyOfCompletedCheckpoint(firstCheckpointId); - SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch); assertMaxCommittedCheckpointId(jobId, operatorId, firstCheckpointId); assertFlinkManifests(1); // 4. notifyCheckpointComplete for checkpoint#2 harness.notifyOfCompletedCheckpoint(secondCheckpointId); - SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2), branch); assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId); assertFlinkManifests(0); } @@ -308,13 +310,13 @@ public void testDisorderedEventsBetweenCheckpoints() throws Exception { // 3. notifyCheckpointComplete for checkpoint#2 harness.notifyOfCompletedCheckpoint(secondCheckpointId); - SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2), branch); assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId); assertFlinkManifests(0); // 4. notifyCheckpointComplete for checkpoint#1 harness.notifyOfCompletedCheckpoint(firstCheckpointId); - SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2), branch); assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId); assertFlinkManifests(0); } @@ -348,7 +350,7 @@ public void testRecoveryFromValidSnapshot() throws Exception { harness.notifyOfCompletedCheckpoint(checkpointId); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(table, ImmutableList.of(row)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row), branch); assertSnapshotSize(1); assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId); } @@ -360,7 +362,7 @@ public void testRecoveryFromValidSnapshot() throws Exception { harness.initializeState(snapshot); harness.open(); - SimpleDataUtil.assertTableRows(table, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows, branch); assertSnapshotSize(1); assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId); @@ -375,7 +377,7 @@ public void testRecoveryFromValidSnapshot() throws Exception { harness.notifyOfCompletedCheckpoint(checkpointId); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(table, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows, branch); assertSnapshotSize(2); assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId); } @@ -406,7 +408,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except harness.processElement(of(dataFile), ++timestamp); snapshot = harness.snapshot(++checkpointId, ++timestamp); - SimpleDataUtil.assertTableRows(table, ImmutableList.of()); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(), branch); assertMaxCommittedCheckpointId(jobId, operatorId, -1L); assertFlinkManifests(1); } @@ -421,7 +423,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except // transaction. assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(table, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows, branch); assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId); harness.snapshot(++checkpointId, ++timestamp); @@ -431,7 +433,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except harness.notifyOfCompletedCheckpoint(checkpointId); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(table, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows, branch); assertSnapshotSize(2); assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId); @@ -459,7 +461,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except assertMaxCommittedCheckpointId(newJobId, operatorId, -1); assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId); - SimpleDataUtil.assertTableRows(table, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows, branch); assertSnapshotSize(3); RowData row = SimpleDataUtil.createRowData(3, "foo"); @@ -473,7 +475,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except harness.notifyOfCompletedCheckpoint(checkpointId); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(table, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows, branch); assertSnapshotSize(4); assertMaxCommittedCheckpointId(newJobId, operatorId, checkpointId); } @@ -509,7 +511,7 @@ public void testStartAnotherJobToWriteSameTable() throws Exception { harness.notifyOfCompletedCheckpoint(checkpointId); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(table, tableRows); + SimpleDataUtil.assertTableRows(table, tableRows, branch); assertSnapshotSize(i); assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, checkpointId); } @@ -540,7 +542,7 @@ public void testStartAnotherJobToWriteSameTable() throws Exception { harness.notifyOfCompletedCheckpoint(checkpointId); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(table, tableRows); + SimpleDataUtil.assertTableRows(table, tableRows, branch); assertSnapshotSize(4); assertMaxCommittedCheckpointId(newJobId, newOperatorId, checkpointId); } @@ -577,7 +579,7 @@ public void testMultipleJobsWriteSameTable() throws Exception { harness.notifyOfCompletedCheckpoint(checkpointId + 1); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(table, tableRows); + SimpleDataUtil.assertTableRows(table, tableRows, branch); assertSnapshotSize(i + 1); assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId + 1); } @@ -628,7 +630,7 @@ public void testMultipleSinksRecoveryFromValidSnapshot() throws Exception { assertFlinkManifests(1); // Only the first row is committed at this point - SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch); assertSnapshotSize(1); assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId); assertMaxCommittedCheckpointId(jobId, operatorId2, -1); @@ -651,7 +653,7 @@ public void testMultipleSinksRecoveryFromValidSnapshot() throws Exception { // transaction. assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(table, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows, branch); assertSnapshotSize(2); assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId); assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId); @@ -675,7 +677,7 @@ public void testMultipleSinksRecoveryFromValidSnapshot() throws Exception { harness2.notifyOfCompletedCheckpoint(checkpointId); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(table, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows, branch); assertSnapshotSize(4); assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId); assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId); @@ -702,12 +704,12 @@ public void testBoundedStream() throws Exception { ((BoundedOneInput) harness.getOneInputOperator()).endInput(); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(table, tableRows); + SimpleDataUtil.assertTableRows(table, tableRows, branch); assertSnapshotSize(1); assertMaxCommittedCheckpointId(jobId, operatorId, Long.MAX_VALUE); Assert.assertEquals( TestIcebergFilesCommitter.class.getName(), - table.currentSnapshot().summary().get("flink.test")); + SimpleDataUtil.latestSnapshot(table, branch).summary().get("flink.test")); } } @@ -748,7 +750,7 @@ public void testFlinkManifests() throws Exception { // 3. notifyCheckpointComplete for checkpoint#1 harness.notifyOfCompletedCheckpoint(checkpoint); - SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch); assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint); assertFlinkManifests(0); } @@ -794,7 +796,7 @@ public void testDeleteFiles() throws Exception { // 3. notifyCheckpointComplete for checkpoint#1 harness.notifyOfCompletedCheckpoint(checkpoint); - SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch); assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint); assertFlinkManifests(0); @@ -816,7 +818,7 @@ public void testDeleteFiles() throws Exception { // 6. notifyCheckpointComplete for checkpoint#2 harness.notifyOfCompletedCheckpoint(checkpoint); - SimpleDataUtil.assertTableRows(table, ImmutableList.of(row2)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row2), branch); assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint); assertFlinkManifests(0); } @@ -867,7 +869,7 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { // Notify the 2nd snapshot to complete. harness.notifyOfCompletedCheckpoint(checkpoint); - SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert4)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert4), branch); assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint); assertFlinkManifests(0); Assert.assertEquals( @@ -951,7 +953,7 @@ private void assertMaxCommittedCheckpointId(JobID jobID, OperatorID operatorID, table.refresh(); long actualId = IcebergFilesCommitter.getMaxCommittedCheckpointId( - table, jobID.toString(), operatorID.toHexString()); + table, jobID.toString(), operatorID.toHexString(), branch); Assert.assertEquals(expectedId, actualId); } @@ -962,7 +964,7 @@ private void assertSnapshotSize(int expectedSnapshotSize) { private OneInputStreamOperatorTestHarness createStreamSink(JobID jobID) throws Exception { - TestOperatorFactory factory = TestOperatorFactory.of(table.location()); + TestOperatorFactory factory = TestOperatorFactory.of(table.location(), branch); return new OneInputStreamOperatorTestHarness<>(factory, createEnvironment(jobID)); } @@ -982,13 +984,15 @@ private static MockEnvironment createEnvironment(JobID jobID) { private static class TestOperatorFactory extends AbstractStreamOperatorFactory implements OneInputStreamOperatorFactory { private final String tablePath; + private final String branch; - private TestOperatorFactory(String tablePath) { + private TestOperatorFactory(String tablePath, String branch) { this.tablePath = tablePath; + this.branch = branch; } - private static TestOperatorFactory of(String tablePath) { - return new TestOperatorFactory(tablePath); + private static TestOperatorFactory of(String tablePath, String branch) { + return new TestOperatorFactory(tablePath, branch); } @Override @@ -1000,7 +1004,8 @@ public > T createStreamOperator( new TestTableLoader(tablePath), false, Collections.singletonMap("flink.test", TestIcebergFilesCommitter.class.getName()), - ThreadPools.WORKER_THREAD_POOL_SIZE); + ThreadPools.WORKER_THREAD_POOL_SIZE, + branch); committer.setup(param.getContainingTask(), param.getStreamConfig(), param.getOutput()); return (T) committer; }