Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,28 @@

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.source.BoundedTableFactory;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
Expand All @@ -58,6 +63,7 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase {
@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

private static final String SOURCE_TABLE = "default_catalog.default_database.bounded_source";
private static final String TABLE_NAME = "test_table";
private TableEnvironment tEnv;
private Table icebergTable;
Expand Down Expand Up @@ -127,6 +133,7 @@ public void before() {
public void clean() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
BoundedTableFactory.clearDataSets();
super.clean();
}

Expand Down Expand Up @@ -253,33 +260,37 @@ public void testHashDistributeMode() throws Exception {
"write.format.default", format.name(),
TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName()
);

// Initialize a BoundedSource table to precisely emit those rows in only one checkpoint.
List<Row> dataSet = IntStream.range(1, 1000)
.mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, "bbb"), Row.of(i, "ccc")))
.flatMap(List::stream)
.collect(Collectors.toList());
String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)" +
" WITH ('connector'='BoundedSource', 'data-id'='%s')", SOURCE_TABLE, dataId);
Assert.assertEquals("Should have the expected rows in source table.", Sets.newHashSet(dataSet),
Sets.newHashSet(sql("SELECT * FROM %s", SOURCE_TABLE)));

sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
tableName, toWithClause(tableProps));

try {
// Insert data set.
sql("INSERT INTO %s VALUES " +
"(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " +
"(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " +
"(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName);
sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE);

Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
SimpleDataUtil.assertTableRecords(table, ImmutableList.of(
SimpleDataUtil.createRecord(1, "aaa"),
SimpleDataUtil.createRecord(1, "bbb"),
SimpleDataUtil.createRecord(1, "ccc"),
SimpleDataUtil.createRecord(2, "aaa"),
SimpleDataUtil.createRecord(2, "bbb"),
SimpleDataUtil.createRecord(2, "ccc"),
SimpleDataUtil.createRecord(3, "aaa"),
SimpleDataUtil.createRecord(3, "bbb"),
SimpleDataUtil.createRecord(3, "ccc")
));
Assert.assertEquals("Should have the expected rows in sink table.", Sets.newHashSet(dataSet),
Sets.newHashSet(sql("SELECT * FROM %s", tableName)));

// Sometimes we will have more than one checkpoint if we pass the auto checkpoint interval,
// thus producing multiple snapshots. Here we assert that each snapshot has only 1 file per partition.
Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
Map<Long, List<DataFile>> snapshotToDataFiles = SimpleDataUtil.snapshotToDataFiles(table);
for (List<DataFile> dataFiles : snapshotToDataFiles.values()) {
if (dataFiles.isEmpty()) {
continue;
}

Assert.assertEquals("There should be 1 data file in partition 'aaa'", 1,
SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), ImmutableMap.of("data", "aaa")).size());
Assert.assertEquals("There should be 1 data file in partition 'bbb'", 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
Expand Down Expand Up @@ -107,20 +109,34 @@ private BoundedTableSource(BoundedTableSource toCopy) {

@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.DELETE)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.build();
Supplier<Stream<Row>> supplier = () -> elementsPerCheckpoint.stream().flatMap(List::stream);

// Add the INSERT row kind by default.
ChangelogMode.Builder builder = ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT);

if (supplier.get().anyMatch(r -> r.getKind() == RowKind.DELETE)) {
builder.addContainedKind(RowKind.DELETE);
}

if (supplier.get().anyMatch(r -> r.getKind() == RowKind.UPDATE_BEFORE)) {
builder.addContainedKind(RowKind.UPDATE_BEFORE);
}

if (supplier.get().anyMatch(r -> r.getKind() == RowKind.UPDATE_AFTER)) {
builder.addContainedKind(RowKind.UPDATE_AFTER);
}

return builder.build();
}

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
return new DataStreamScanProvider() {
@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment env) {
SourceFunction<Row> source = new BoundedTestSource<>(elementsPerCheckpoint);
boolean checkpointEnabled = env.getCheckpointConfig().isCheckpointingEnabled();
SourceFunction<Row> source = new BoundedTestSource<>(elementsPerCheckpoint, checkpointEnabled);

RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();
// Converter to convert the Row to RowData.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* A stream source that:
Expand All @@ -39,15 +40,21 @@
public final class BoundedTestSource<T> implements SourceFunction<T>, CheckpointListener {

private final List<List<T>> elementsPerCheckpoint;
private final boolean checkpointEnabled;
private volatile boolean running = true;

private final AtomicInteger numCheckpointsComplete = new AtomicInteger(0);

/**
* Emits all those elements in several checkpoints.
*/
public BoundedTestSource(List<List<T>> elementsPerCheckpoint) {
public BoundedTestSource(List<List<T>> elementsPerCheckpoint, boolean checkpointEnabled) {
this.elementsPerCheckpoint = elementsPerCheckpoint;
this.checkpointEnabled = checkpointEnabled;
}

public BoundedTestSource(List<List<T>> elementsPerCheckpoint) {
this(elementsPerCheckpoint, true);
}

/**
Expand All @@ -59,7 +66,14 @@ public BoundedTestSource(T... elements) {

@Override
public void run(SourceContext<T> ctx) throws Exception {
for (int checkpoint = 0; checkpoint < elementsPerCheckpoint.size(); checkpoint++) {
if (!checkpointEnabled) {
Preconditions.checkArgument(elementsPerCheckpoint.size() <= 1,
"There should be at most one list in the elementsPerCheckpoint when checkpoint is disabled.");
elementsPerCheckpoint.stream().flatMap(List::stream).forEach(ctx::collect);
return;
}

for (List<T> elements : elementsPerCheckpoint) {

final int checkpointToAwait;
synchronized (ctx.getCheckpointLock()) {
Expand All @@ -70,7 +84,7 @@ public void run(SourceContext<T> ctx) throws Exception {
// affected in the end. Setting the delta to be 2 is introducing the variable that produce un-continuous
// checkpoints that emit the records buffer from elementsPerCheckpoints.
checkpointToAwait = numCheckpointsComplete.get() + 2;
for (T element : elementsPerCheckpoint.get(checkpoint)) {
for (T element : elements) {
ctx.collect(element);
}
}
Expand Down