diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java index 8415129db9a7..eca662036513 100644 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java +++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java @@ -28,7 +28,9 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataWrapper; +import org.apache.iceberg.flink.data.RowDataProjection; import org.apache.iceberg.io.BaseTaskWriter; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; @@ -41,6 +43,8 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter { private final Schema schema; private final Schema deleteSchema; private final RowDataWrapper wrapper; + private final RowDataWrapper keyWrapper; + private final RowDataProjection keyProjection; private final boolean upsert; BaseDeltaTaskWriter(PartitionSpec spec, @@ -58,6 +62,8 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter { this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); this.upsert = upsert; + this.keyWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct()); + this.keyProjection = RowDataProjection.create(schema, deleteSchema); } abstract RowDataDeltaWriter route(RowData row); @@ -74,7 +80,7 @@ public void write(RowData row) throws IOException { case INSERT: case UPDATE_AFTER: if (upsert) { - writer.delete(row); + writer.deleteKey(keyProjection.wrap(row)); } writer.write(row); break; @@ -101,7 +107,7 @@ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter { @Override protected StructLike asStructLike(RowData data) { - return wrapper.wrap(data); + return keyWrapper.wrap(data); } } } diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 2849100858a1..5144305029f4 100644 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -35,6 +35,8 @@ import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.UnpartitionedWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.ArrayUtil; public class RowDataTaskWriterFactory implements TaskWriterFactory { @@ -69,8 +71,13 @@ public RowDataTaskWriterFactory(Table table, if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec); + } else if (upsert) { + // In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of the inserted row + // may differ from the deleted row other than the primary key fields, and the delete file must contain values + // that are correct for the deleted row. Therefore, only write the equality delete fields. + this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec, + ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null); } else { - // TODO provide the ability to customize the equality-delete row schema. this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec, ArrayUtil.toIntArray(equalityFieldIds), schema, null); } diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java new file mode 100644 index 000000000000..6ec35e2ff169 --- /dev/null +++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.time.LocalDate; +import java.util.List; +import java.util.Map; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.types.Row; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestFlinkUpsert extends FlinkCatalogTestBase { + + @ClassRule + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = + MiniClusterResource.createWithClassloaderCheckDisabled(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private final boolean isStreamingJob; + private final Map tableUpsertProps = Maps.newHashMap(); + private TableEnvironment tEnv; + + public TestFlinkUpsert(String catalogName, Namespace baseNamespace, FileFormat format, Boolean isStreamingJob) { + super(catalogName, baseNamespace); + this.isStreamingJob = isStreamingJob; + tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2"); + tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true"); + tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name()); + } + + @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}") + public static Iterable parameters() { + List parameters = Lists.newArrayList(); + for (FileFormat format : new FileFormat[] {FileFormat.PARQUET, FileFormat.AVRO, FileFormat.ORC}) { + for (Boolean isStreaming : new Boolean[] {true, false}) { + // Only test with one catalog as this is a file operation concern. + // FlinkCatalogTestBase requires the catalog name start with testhadoop if using hadoop catalog. + String catalogName = "testhadoop"; + Namespace baseNamespace = Namespace.of("default"); + parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming}); + } + } + return parameters; + } + + @Override + protected TableEnvironment getTableEnv() { + if (tEnv == null) { + synchronized (this) { + EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings + .newInstance(); + if (isStreamingJob) { + settingsBuilder.inStreamingMode(); + StreamExecutionEnvironment env = StreamExecutionEnvironment + .getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG); + env.enableCheckpointing(400); + env.setMaxParallelism(2); + env.setParallelism(2); + tEnv = StreamTableEnvironment.create(env, settingsBuilder.build()); + } else { + settingsBuilder.inBatchMode(); + tEnv = TableEnvironment.create(settingsBuilder.build()); + } + } + } + return tEnv; + } + + @Override + @Before + public void before() { + super.before(); + sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase); + sql("USE CATALOG %s", catalogName); + sql("USE %s", DATABASE); + } + + @Override + @After + public void clean() { + sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + super.clean(); + } + + @Test + public void testUpsertAndQuery() { + String tableName = "test_upsert_query"; + LocalDate dt20220301 = LocalDate.of(2022, 3, 1); + LocalDate dt20220302 = LocalDate.of(2022, 3, 2); + + sql("CREATE TABLE %s(id INT NOT NULL, province STRING NOT NULL, dt DATE, PRIMARY KEY(id,province) NOT ENFORCED) " + + "PARTITIONED BY (province) WITH %s", + tableName, toWithClause(tableUpsertProps)); + + try { + sql("INSERT INTO %s VALUES " + + "(1, 'a', TO_DATE('2022-03-01'))," + + "(2, 'b', TO_DATE('2022-03-01'))," + + "(1, 'b', TO_DATE('2022-03-01'))", + tableName); + + sql("INSERT INTO %s VALUES " + + "(4, 'a', TO_DATE('2022-03-02'))," + + "(5, 'b', TO_DATE('2022-03-02'))," + + "(1, 'b', TO_DATE('2022-03-02'))", + tableName); + + List rowsOn20220301 = Lists.newArrayList(Row.of(2, "b", dt20220301), Row.of(1, "a", dt20220301)); + TestHelpers.assertRows( + sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName), + rowsOn20220301); + + List rowsOn20220302 = Lists.newArrayList( + Row.of(1, "b", dt20220302), Row.of(4, "a", dt20220302), Row.of(5, "b", dt20220302)); + TestHelpers.assertRows( + sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName), + rowsOn20220302); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Iterables.concat(rowsOn20220301, rowsOn20220302))); + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } + } + + @Test + public void testPrimaryKeyEqualToPartitionKey() { + // This is an SQL based reproduction of TestFlinkIcebergSinkV2#testUpsertOnDataKey + String tableName = "upsert_on_data_key"; + try { + sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL, PRIMARY KEY(data) NOT ENFORCED) " + + "PARTITIONED BY (data) WITH %s", + tableName, toWithClause(tableUpsertProps)); + + sql("INSERT INTO %s VALUES " + + "(1, 'aaa')," + + "(2, 'aaa')," + + "(3, 'bbb')", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of(2, "aaa"), Row.of(3, "bbb"))); + + sql("INSERT INTO %s VALUES " + + "(4, 'aaa')," + + "(5, 'bbb')", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of(4, "aaa"), Row.of(5, "bbb"))); + + sql("INSERT INTO %s VALUES " + + "(6, 'aaa')," + + "(7, 'bbb')", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of(6, "aaa"), Row.of(7, "bbb"))); + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } + } + + @Test + public void testPrimaryKeyFieldsAtBeginningOfSchema() { + String tableName = "upsert_on_pk_at_schema_start"; + LocalDate dt = LocalDate.of(2022, 3, 1); + try { + sql("CREATE TABLE %s(data STRING NOT NULL, dt DATE NOT NULL, id INT, PRIMARY KEY(data,dt) NOT ENFORCED) " + + "PARTITIONED BY (data) WITH %s", + tableName, toWithClause(tableUpsertProps)); + + sql("INSERT INTO %s VALUES " + + "('aaa', TO_DATE('2022-03-01'), 1)," + + "('aaa', TO_DATE('2022-03-01'), 2)," + + "('bbb', TO_DATE('2022-03-01'), 3)", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of("aaa", dt, 2), Row.of("bbb", dt, 3))); + + sql("INSERT INTO %s VALUES " + + "('aaa', TO_DATE('2022-03-01'), 4)," + + "('bbb', TO_DATE('2022-03-01'), 5)", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of("aaa", dt, 4), Row.of("bbb", dt, 5))); + + sql("INSERT INTO %s VALUES " + + "('aaa', TO_DATE('2022-03-01'), 6)," + + "('bbb', TO_DATE('2022-03-01'), 7)", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of("aaa", dt, 6), Row.of("bbb", dt, 7))); + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } + } + + @Test + public void testPrimaryKeyFieldsAtEndOfTableSchema() { + // This is the same test case as testPrimaryKeyFieldsAtBeginningOfSchema, but the primary key fields + // are located at the end of the flink schema. + String tableName = "upsert_on_pk_at_schema_end"; + LocalDate dt = LocalDate.of(2022, 3, 1); + try { + sql("CREATE TABLE %s(id INT, data STRING NOT NULL, dt DATE NOT NULL, PRIMARY KEY(data,dt) NOT ENFORCED) " + + "PARTITIONED BY (data) WITH %s", + tableName, toWithClause(tableUpsertProps)); + + sql("INSERT INTO %s VALUES " + + "(1, 'aaa', TO_DATE('2022-03-01'))," + + "(2, 'aaa', TO_DATE('2022-03-01'))," + + "(3, 'bbb', TO_DATE('2022-03-01'))", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of(2, "aaa", dt), Row.of(3, "bbb", dt))); + + sql("INSERT INTO %s VALUES " + + "(4, 'aaa', TO_DATE('2022-03-01'))," + + "(5, 'bbb', TO_DATE('2022-03-01'))", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of(4, "aaa", dt), Row.of(5, "bbb", dt))); + + sql("INSERT INTO %s VALUES " + + "(6, 'aaa', TO_DATE('2022-03-01'))," + + "(7, 'bbb', TO_DATE('2022-03-01'))", + tableName); + + TestHelpers.assertRows( + sql("SELECT * FROM %s", tableName), + Lists.newArrayList(Row.of(6, "aaa", dt), Row.of(7, "bbb", dt))); + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } + } +} diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java index 9cce4a04308a..0c30b09166fc 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java @@ -21,6 +21,8 @@ import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; @@ -28,6 +30,7 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.types.Row; import org.apache.iceberg.DataFile; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; @@ -35,9 +38,11 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.source.BoundedTableFactory; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -58,6 +63,7 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase { @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + private static final String SOURCE_TABLE = "default_catalog.default_database.bounded_source"; private static final String TABLE_NAME = "test_table"; private TableEnvironment tEnv; private Table icebergTable; @@ -127,6 +133,7 @@ public void before() { public void clean() { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME); sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + BoundedTableFactory.clearDataSets(); super.clean(); } @@ -253,33 +260,37 @@ public void testHashDistributeMode() throws Exception { "write.format.default", format.name(), TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName() ); + + // Initialize a BoundedSource table to precisely emit those rows in only one checkpoint. + List dataSet = IntStream.range(1, 1000) + .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, "bbb"), Row.of(i, "ccc"))) + .flatMap(List::stream) + .collect(Collectors.toList()); + String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet)); + sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)" + + " WITH ('connector'='BoundedSource', 'data-id'='%s')", SOURCE_TABLE, dataId); + Assert.assertEquals("Should have the expected rows in source table.", Sets.newHashSet(dataSet), + Sets.newHashSet(sql("SELECT * FROM %s", SOURCE_TABLE))); + sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s", tableName, toWithClause(tableProps)); try { // Insert data set. - sql("INSERT INTO %s VALUES " + - "(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " + - "(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " + - "(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName); + sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE); - Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); - SimpleDataUtil.assertTableRecords(table, ImmutableList.of( - SimpleDataUtil.createRecord(1, "aaa"), - SimpleDataUtil.createRecord(1, "bbb"), - SimpleDataUtil.createRecord(1, "ccc"), - SimpleDataUtil.createRecord(2, "aaa"), - SimpleDataUtil.createRecord(2, "bbb"), - SimpleDataUtil.createRecord(2, "ccc"), - SimpleDataUtil.createRecord(3, "aaa"), - SimpleDataUtil.createRecord(3, "bbb"), - SimpleDataUtil.createRecord(3, "ccc") - )); + Assert.assertEquals("Should have the expected rows in sink table.", Sets.newHashSet(dataSet), + Sets.newHashSet(sql("SELECT * FROM %s", tableName))); // Sometimes we will have more than one checkpoint if we pass the auto checkpoint interval, // thus producing multiple snapshots. Here we assert that each snapshot has only 1 file per partition. + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); Map> snapshotToDataFiles = SimpleDataUtil.snapshotToDataFiles(table); for (List dataFiles : snapshotToDataFiles.values()) { + if (dataFiles.isEmpty()) { + continue; + } + Assert.assertEquals("There should be 1 data file in partition 'aaa'", 1, SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), ImmutableMap.of("data", "aaa")).size()); Assert.assertEquals("There should be 1 data file in partition 'bbb'", 1, diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java index a039be9d5b97..b0041c3bc04d 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.Stream; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; @@ -107,12 +109,25 @@ private BoundedTableSource(BoundedTableSource toCopy) { @Override public ChangelogMode getChangelogMode() { - return ChangelogMode.newBuilder() - .addContainedKind(RowKind.INSERT) - .addContainedKind(RowKind.DELETE) - .addContainedKind(RowKind.UPDATE_BEFORE) - .addContainedKind(RowKind.UPDATE_AFTER) - .build(); + Supplier> supplier = () -> elementsPerCheckpoint.stream().flatMap(List::stream); + + // Add the INSERT row kind by default. + ChangelogMode.Builder builder = ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT); + + if (supplier.get().anyMatch(r -> r.getKind() == RowKind.DELETE)) { + builder.addContainedKind(RowKind.DELETE); + } + + if (supplier.get().anyMatch(r -> r.getKind() == RowKind.UPDATE_BEFORE)) { + builder.addContainedKind(RowKind.UPDATE_BEFORE); + } + + if (supplier.get().anyMatch(r -> r.getKind() == RowKind.UPDATE_AFTER)) { + builder.addContainedKind(RowKind.UPDATE_AFTER); + } + + return builder.build(); } @Override @@ -120,7 +135,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon return new DataStreamScanProvider() { @Override public DataStream produceDataStream(StreamExecutionEnvironment env) { - SourceFunction source = new BoundedTestSource<>(elementsPerCheckpoint); + boolean checkpointEnabled = env.getCheckpointConfig().isCheckpointingEnabled(); + SourceFunction source = new BoundedTestSource<>(elementsPerCheckpoint, checkpointEnabled); RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType(); // Converter to convert the Row to RowData. diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java index 6f6712dea74e..54e44ee5b008 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** * A stream source that: @@ -39,6 +40,7 @@ public final class BoundedTestSource implements SourceFunction, CheckpointListener { private final List> elementsPerCheckpoint; + private final boolean checkpointEnabled; private volatile boolean running = true; private final AtomicInteger numCheckpointsComplete = new AtomicInteger(0); @@ -46,8 +48,13 @@ public final class BoundedTestSource implements SourceFunction, Checkpoint /** * Emits all those elements in several checkpoints. */ - public BoundedTestSource(List> elementsPerCheckpoint) { + public BoundedTestSource(List> elementsPerCheckpoint, boolean checkpointEnabled) { this.elementsPerCheckpoint = elementsPerCheckpoint; + this.checkpointEnabled = checkpointEnabled; + } + + public BoundedTestSource(List> elementsPerCheckpoint) { + this(elementsPerCheckpoint, true); } /** @@ -59,7 +66,14 @@ public BoundedTestSource(T... elements) { @Override public void run(SourceContext ctx) throws Exception { - for (int checkpoint = 0; checkpoint < elementsPerCheckpoint.size(); checkpoint++) { + if (!checkpointEnabled) { + Preconditions.checkArgument(elementsPerCheckpoint.size() <= 1, + "There should be at most one list in the elementsPerCheckpoint when checkpoint is disabled."); + elementsPerCheckpoint.stream().flatMap(List::stream).forEach(ctx::collect); + return; + } + + for (List elements : elementsPerCheckpoint) { final int checkpointToAwait; synchronized (ctx.getCheckpointLock()) { @@ -70,7 +84,7 @@ public void run(SourceContext ctx) throws Exception { // affected in the end. Setting the delta to be 2 is introducing the variable that produce un-continuous // checkpoints that emit the records buffer from elementsPerCheckpoints. checkpointToAwait = numCheckpointsComplete.get() + 2; - for (T element : elementsPerCheckpoint.get(checkpoint)) { + for (T element : elements) { ctx.collect(element); } }