diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java index 9cce4a04308a..0c30b09166fc 100644 --- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java +++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java @@ -21,6 +21,8 @@ import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; @@ -28,6 +30,7 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.types.Row; import org.apache.iceberg.DataFile; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; @@ -35,9 +38,11 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.source.BoundedTableFactory; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -58,6 +63,7 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase { @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + private static final String SOURCE_TABLE = "default_catalog.default_database.bounded_source"; private static final String TABLE_NAME = "test_table"; private TableEnvironment tEnv; private Table icebergTable; @@ -127,6 +133,7 @@ public void before() { public void clean() { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME); sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + BoundedTableFactory.clearDataSets(); super.clean(); } @@ -253,33 +260,37 @@ public void testHashDistributeMode() throws Exception { "write.format.default", format.name(), TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName() ); + + // Initialize a BoundedSource table to precisely emit those rows in only one checkpoint. + List dataSet = IntStream.range(1, 1000) + .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, "bbb"), Row.of(i, "ccc"))) + .flatMap(List::stream) + .collect(Collectors.toList()); + String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet)); + sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)" + + " WITH ('connector'='BoundedSource', 'data-id'='%s')", SOURCE_TABLE, dataId); + Assert.assertEquals("Should have the expected rows in source table.", Sets.newHashSet(dataSet), + Sets.newHashSet(sql("SELECT * FROM %s", SOURCE_TABLE))); + sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s", tableName, toWithClause(tableProps)); try { // Insert data set. - sql("INSERT INTO %s VALUES " + - "(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " + - "(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " + - "(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName); + sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE); - Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); - SimpleDataUtil.assertTableRecords(table, ImmutableList.of( - SimpleDataUtil.createRecord(1, "aaa"), - SimpleDataUtil.createRecord(1, "bbb"), - SimpleDataUtil.createRecord(1, "ccc"), - SimpleDataUtil.createRecord(2, "aaa"), - SimpleDataUtil.createRecord(2, "bbb"), - SimpleDataUtil.createRecord(2, "ccc"), - SimpleDataUtil.createRecord(3, "aaa"), - SimpleDataUtil.createRecord(3, "bbb"), - SimpleDataUtil.createRecord(3, "ccc") - )); + Assert.assertEquals("Should have the expected rows in sink table.", Sets.newHashSet(dataSet), + Sets.newHashSet(sql("SELECT * FROM %s", tableName))); // Sometimes we will have more than one checkpoint if we pass the auto checkpoint interval, // thus producing multiple snapshots. Here we assert that each snapshot has only 1 file per partition. + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); Map> snapshotToDataFiles = SimpleDataUtil.snapshotToDataFiles(table); for (List dataFiles : snapshotToDataFiles.values()) { + if (dataFiles.isEmpty()) { + continue; + } + Assert.assertEquals("There should be 1 data file in partition 'aaa'", 1, SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), ImmutableMap.of("data", "aaa")).size()); Assert.assertEquals("There should be 1 data file in partition 'bbb'", 1, diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java index a039be9d5b97..b0041c3bc04d 100644 --- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java +++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.Stream; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; @@ -107,12 +109,25 @@ private BoundedTableSource(BoundedTableSource toCopy) { @Override public ChangelogMode getChangelogMode() { - return ChangelogMode.newBuilder() - .addContainedKind(RowKind.INSERT) - .addContainedKind(RowKind.DELETE) - .addContainedKind(RowKind.UPDATE_BEFORE) - .addContainedKind(RowKind.UPDATE_AFTER) - .build(); + Supplier> supplier = () -> elementsPerCheckpoint.stream().flatMap(List::stream); + + // Add the INSERT row kind by default. + ChangelogMode.Builder builder = ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT); + + if (supplier.get().anyMatch(r -> r.getKind() == RowKind.DELETE)) { + builder.addContainedKind(RowKind.DELETE); + } + + if (supplier.get().anyMatch(r -> r.getKind() == RowKind.UPDATE_BEFORE)) { + builder.addContainedKind(RowKind.UPDATE_BEFORE); + } + + if (supplier.get().anyMatch(r -> r.getKind() == RowKind.UPDATE_AFTER)) { + builder.addContainedKind(RowKind.UPDATE_AFTER); + } + + return builder.build(); } @Override @@ -120,7 +135,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon return new DataStreamScanProvider() { @Override public DataStream produceDataStream(StreamExecutionEnvironment env) { - SourceFunction source = new BoundedTestSource<>(elementsPerCheckpoint); + boolean checkpointEnabled = env.getCheckpointConfig().isCheckpointingEnabled(); + SourceFunction source = new BoundedTestSource<>(elementsPerCheckpoint, checkpointEnabled); RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType(); // Converter to convert the Row to RowData. diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java index 6f6712dea74e..54e44ee5b008 100644 --- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java +++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** * A stream source that: @@ -39,6 +40,7 @@ public final class BoundedTestSource implements SourceFunction, CheckpointListener { private final List> elementsPerCheckpoint; + private final boolean checkpointEnabled; private volatile boolean running = true; private final AtomicInteger numCheckpointsComplete = new AtomicInteger(0); @@ -46,8 +48,13 @@ public final class BoundedTestSource implements SourceFunction, Checkpoint /** * Emits all those elements in several checkpoints. */ - public BoundedTestSource(List> elementsPerCheckpoint) { + public BoundedTestSource(List> elementsPerCheckpoint, boolean checkpointEnabled) { this.elementsPerCheckpoint = elementsPerCheckpoint; + this.checkpointEnabled = checkpointEnabled; + } + + public BoundedTestSource(List> elementsPerCheckpoint) { + this(elementsPerCheckpoint, true); } /** @@ -59,7 +66,14 @@ public BoundedTestSource(T... elements) { @Override public void run(SourceContext ctx) throws Exception { - for (int checkpoint = 0; checkpoint < elementsPerCheckpoint.size(); checkpoint++) { + if (!checkpointEnabled) { + Preconditions.checkArgument(elementsPerCheckpoint.size() <= 1, + "There should be at most one list in the elementsPerCheckpoint when checkpoint is disabled."); + elementsPerCheckpoint.stream().flatMap(List::stream).forEach(ctx::collect); + return; + } + + for (List elements : elementsPerCheckpoint) { final int checkpointToAwait; synchronized (ctx.getCheckpointLock()) { @@ -70,7 +84,7 @@ public void run(SourceContext ctx) throws Exception { // affected in the end. Setting the delta to be 2 is introducing the variable that produce un-continuous // checkpoints that emit the records buffer from elementsPerCheckpoints. checkpointToAwait = numCheckpointsComplete.get() + 2; - for (T element : elementsPerCheckpoint.get(checkpoint)) { + for (T element : elements) { ctx.collect(element); } }